From 087f51e018a20449ca2875fded8f68d2ef7abf9e Mon Sep 17 00:00:00 2001 From: getong Date: Thu, 17 Aug 2017 11:42:19 +0000 Subject: [PATCH 01/10] add erlang 20 --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index 15834e620..9f17acf2c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,6 +4,7 @@ otp_release: - 19.0 - 19.1 - 19.2 + - 20.0 script: - make From 363928505ac9c6f2182a6ce5e27e38eded1dfc20 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 23 Aug 2017 10:04:45 +0800 Subject: [PATCH 02/10] Depends on Erlang/OTP R20 --- .travis.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 9f17acf2c..2266ed9e3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,9 +1,6 @@ language: erlang otp_release: - - 19.0 - - 19.1 - - 19.2 - 20.0 script: From 88f7e71f29e7d6dcb539919adcc9dbd8e9851fc5 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 23 Aug 2017 11:08:03 +0800 Subject: [PATCH 03/10] Update rebar.config --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index 834382f05..e2d3f91cc 100644 --- a/rebar.config +++ b/rebar.config @@ -1,4 +1,4 @@ {deps, [ -{goldrush,".*",{git,"https://github.com/basho/goldrush","0.1.9"}},{gproc,".*",{git,"https://github.com/uwiger/gproc",""}},{lager,".*",{git,"https://github.com/basho/lager","master"}},{esockd,".*",{git,"https://github.com/emqtt/esockd","master"}},{ekka,".*",{git,"https://github.com/emqtt/ekka","develop"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb","master"}},{pbkdf2,".*",{git,"https://github.com/emqtt/pbkdf2","2.0.1"}},{lager_syslog,".*",{git,"https://github.com/basho/lager_syslog",""}},{bcrypt,".*",{git,"https://github.com/smarkets/erlang-bcrypt","master"}} +{goldrush,".*",{git,"https://github.com/basho/goldrush","0.1.9"}},{gproc,".*",{git,"https://github.com/uwiger/gproc",""}},{lager,".*",{git,"https://github.com/basho/lager","master"}},{esockd,".*",{git,"https://github.com/emqtt/esockd","master"}},{ekka,".*",{git,"https://github.com/emqtt/ekka","develop"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb","master"}},{pbkdf2,".*",{git,"https://github.com/emqtt/pbkdf2","2.0.1"}},{lager_syslog,".*",{git,"https://github.com/basho/lager_syslog",""}},{bcrypt,".*",{git,"https://github.com/smarkets/erlang-bcrypt","master"}},{clique,".*",{git,"https://github.com/emqtt/clique",""}},{jsx,".*",{git,"https://github.com/talentdeficit/jsx",""}} ]}. {erl_opts, [debug_info,{parse_transform,lager_transform}]}. From 947df6ce35d74bb134bac5cc224dc34e2ea3c700 Mon Sep 17 00:00:00 2001 From: HuangDan Date: Wed, 23 Aug 2017 21:45:11 +0800 Subject: [PATCH 04/10] Add test cases for topic match --- test/emqttd_SUITE.erl | 6 +++--- test/emqttd_topic_SUITE.erl | 10 +++++++++- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/test/emqttd_SUITE.erl b/test/emqttd_SUITE.erl index ed8896f0d..195820248 100644 --- a/test/emqttd_SUITE.erl +++ b/test/emqttd_SUITE.erl @@ -129,7 +129,6 @@ init_per_suite(Config) -> NewConfig = generate_config(), lists:foreach(fun set_app_env/1, NewConfig), application:ensure_all_started(?APP), - timer:sleep(6000), Config. end_per_suite(_Config) -> @@ -590,8 +589,9 @@ conflict_listeners(_) -> {current_clients, esockd:get_current_clients(Pid)}, {shutdown_count, esockd:get_shutdown_count(Pid)}]} end, esockd:listeners()), - ?assertEqual(1, proplists:get_value(current_clients, proplists:get_value("mqtt:tcp:0.0.0.0:1883", Listeners))), - ?assertEqual([{conflict,1}], proplists:get_value(shutdown_count, proplists:get_value("mqtt:tcp:0.0.0.0:1883", Listeners))), + L =proplists:get_value("mqtt:tcp:0.0.0.0:1883", Listeners), + ?assertEqual(1, proplists:get_value(current_clients, L)), + ?assertEqual(1, proplists:get_value(conflict, L)), emqttc:disconnect(C2). cli_vm(_) -> diff --git a/test/emqttd_topic_SUITE.erl b/test/emqttd_topic_SUITE.erl index a43875fba..b1ea4d8ed 100644 --- a/test/emqttd_topic_SUITE.erl +++ b/test/emqttd_topic_SUITE.erl @@ -26,7 +26,7 @@ -define(N, 10000). -all() -> [t_wildcard, t_match, t_match2, t_validate, t_triples, t_join, +all() -> [t_wildcard, t_match, t_match2, t_match3, t_validate, t_triples, t_join, t_words, t_systop, t_feed_var, t_sys_match, 't_#_match', t_sigle_level_validate, t_sigle_level_match, t_match_perf, t_triples_perf, t_parse]. @@ -71,6 +71,14 @@ t_match2(_) -> false = match(<<"$shared/x/y">>, <<"+/+/#">>), false = match(<<"house/1/sensor/0">>, <<"house/+">>). +t_match3(_) -> + true = match(<<"device/60019423a83c/fw">>, <<"device/60019423a83c/#">>), + false = match(<<"device/60019423a83c/$fw">>, <<"device/60019423a83c/#">>), + true = match(<<"device/60019423a83c/$fw/fw">>, <<"device/60019423a83c/$fw/#">>), + true = match(<<"device/60019423a83c/fw/checksum">>, <<"device/60019423a83c/#">>), + false = match(<<"device/60019423a83c/$fw/checksum">>, <<"device/60019423a83c/#">>), + true = match(<<"device/60019423a83c/dust/type">>, <<"device/60019423a83c/#">>). + t_sigle_level_match(_) -> true = match(<<"sport/tennis/player1">>, <<"sport/tennis/+">>), false = match(<<"sport/tennis/player1/ranking">>, <<"sport/tennis/+">>), From d3f52898ee03dc70b2a7f023b46dca1ee4cf998b Mon Sep 17 00:00:00 2001 From: turtled Date: Tue, 5 Sep 2017 16:52:07 +0800 Subject: [PATCH 05/10] Add Hot configuration plugin --- include/emqttd_internal.hrl | 3 +- priv/emq.schema | 12 +-- src/emqttd_config.erl | 43 ++++++++- src/emqttd_http.erl | 19 ++-- src/emqttd_mgmt.erl | 163 +++++++++++++++++++++++-------- src/emqttd_rest_api.erl | 185 +++++++++++++++++++++++++++--------- 6 files changed, 317 insertions(+), 108 deletions(-) diff --git a/include/emqttd_internal.hrl b/include/emqttd_internal.hrl index 06398e31f..343be68e4 100644 --- a/include/emqttd_internal.hrl +++ b/include/emqttd_internal.hrl @@ -72,5 +72,6 @@ -define(ERROR10, 110). %% Plugin has been loaded -define(ERROR11, 111). %% Plugin has been loaded -define(ERROR12, 112). %% Client not online --define(ERROR13, 113). %% Modify config fail +-define(ERROR13, 113). %% User already exist +-define(ERROR14, 114). %% OldPassword error diff --git a/priv/emq.schema b/priv/emq.schema index bd533d05e..ce4baf36b 100644 --- a/priv/emq.schema +++ b/priv/emq.schema @@ -624,13 +624,13 @@ end}. %% @doc Low-water mark of queued messages {mapping, "mqtt.mqueue.low_watermark", "emqttd.mqueue", [ {default, "20%"}, - {datatype, string} + {datatype, {percent, float}} ]}. %% @doc High-water mark of queued messages {mapping, "mqtt.mqueue.high_watermark", "emqttd.mqueue", [ {default, "60%"}, - {datatype, string} + {datatype, {percent, float}} ]}. %% @doc Queue Qos0 messages? @@ -640,14 +640,10 @@ end}. ]}. {translation, "emqttd.mqueue", fun(Conf) -> - Parse = fun(S) -> - {match, [N]} = re:run(S, "^([0-9]+)%$", [{capture, all_but_first, list}]), - list_to_integer(N) / 100 - end, Opts = [{type, cuttlefish:conf_get("mqtt.mqueue.type", Conf, simple)}, {max_length, cuttlefish:conf_get("mqtt.mqueue.max_length", Conf)}, - {low_watermark, Parse(cuttlefish:conf_get("mqtt.mqueue.low_watermark", Conf))}, - {high_watermark, Parse(cuttlefish:conf_get("mqtt.mqueue.high_watermark", Conf))}, + {low_watermark, cuttlefish:conf_get("mqtt.mqueue.low_watermark", Conf)}, + {high_watermark, cuttlefish:conf_get("mqtt.mqueue.high_watermark", Conf)}, {store_qos0, cuttlefish:conf_get("mqtt.mqueue.store_qos0", Conf)}], case cuttlefish:conf_get("mqtt.mqueue.priority", Conf) of undefined -> Opts; diff --git a/src/emqttd_config.erl b/src/emqttd_config.erl index 77a4e8949..cae8ad33c 100644 --- a/src/emqttd_config.erl +++ b/src/emqttd_config.erl @@ -25,18 +25,19 @@ -module(emqttd_config). --export([read/1, dump/2, reload/1, get/2, get/3, set/3]). +-export([read/1, write/2, dump/2, reload/1, get/2, get/3, set/3]). -type(env() :: {atom(), term()}). %% @doc Read the configuration of an application. -spec(read(atom()) -> {ok, list(env())} | {error, term()}). -read(_App) -> +read(App) -> %% TODO: %% 1. Read the app.conf from etc folder %% 2. Cuttlefish to read the conf %% 3. Return the terms and schema - {error, unsupported}. + % {error, unsupported}. + {ok, read_(App)}. %% @doc Reload configuration of an application. -spec(reload(atom()) -> ok | {error, term()}). @@ -47,6 +48,20 @@ reload(_App) -> %% 3. set/3 to apply the config ok. +-spec(write(atom(), list(env())) -> ok | {error, term()}). +write(App, Terms) -> + Configs = lists:map(fun({Key, Val}) -> + {cuttlefish_variable:tokenize(binary_to_list(Key)), binary_to_list(Val)} + end, Terms), + Path = lists:concat([code:priv_dir(App), "/", App, ".schema"]), + Schema = cuttlefish_schema:files([Path]), + case cuttlefish_generator:map(Schema, Configs) of + [{App, Configs1}] -> + lists:foreach(fun({Key, Val}) -> application:set_env(App, Key, Val) end, Configs1); + _ -> + error + end. + -spec(dump(atom(), list(env())) -> ok | {error, term()}). dump(_App, _Terms) -> %% TODO @@ -70,3 +85,25 @@ get(App, Par) -> get(App, Par, Def) -> emqttd_cli_config:get_cfg(App, Par, Def). + +read_(App) -> + {ok, PluginsEtcDir} = emqttd:env(plugins_etc_dir), + Configs = cuttlefish_conf:file(lists:concat([PluginsEtcDir, App, ".conf"])), + Path= lists:concat([code:priv_dir(App), "/", App, ".schema"]), + {_, Mappings, _} = cuttlefish_schema:files([Path]), + OptionalCfg = lists:foldl(fun(Map, Acc) -> + Key = cuttlefish_mapping:variable(Map), + case proplists:get_value(Key, Configs) of + undefined -> + [{cuttlefish_variable:format(Key), "", cuttlefish_mapping:doc(Map), false} | Acc]; + _ -> Acc + end + end, [], Mappings), + RequiredCfg = lists:foldl(fun({Key, Val}, Acc) -> + case lists:keyfind(Key, 2, Mappings) of + false -> Acc; + Map -> + [{cuttlefish_variable:format(Key), Val, cuttlefish_mapping:doc(Map), true} | Acc] + end + end, [], Configs), + RequiredCfg ++ OptionalCfg. diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index 4846bdb50..c1b255a40 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -51,8 +51,15 @@ handle_request(Req, State) -> handle_request("/status", Req, Req:get(method)); "/" -> handle_request("/", Req, Req:get(method)); + "/api/v2/auth" -> + handle_request(Path, Req, State); _ -> - if_authorized(Req, fun() -> handle_request(Path, Req, State) end) + Host = Req:get_header_value("Host"), + [_, Port] = string:tokens(Host, ":"), + case Port of + "18083" -> handle_request(Path, Req, State); + _ -> if_authorized(Req, fun() -> handle_request(Path, Req, State) end) + end end. handle_request("/api/v2/" ++ Url, Req, #state{dispatch = Dispatch}) -> @@ -125,14 +132,8 @@ authorized(Req) -> false; "Basic " ++ BasicAuth -> {Username, Password} = user_passwd(BasicAuth), - {ok, Peer} = Req:get(peername), - Params = params(Req), - ClientId = get_value(<<"client">>, Params, http), - case emqttd_access_control:auth(#mqtt_client{client_id = ClientId, - username = Username, - peername = Peer}, Password) of - ok -> true; - {ok, _IsSuper} -> + case emqttd_mgmt:check_user(Username, Password) of + ok -> true; {error, Reason} -> lager:error("HTTP Auth failure: username=~s, reason=~p", [Username, Reason]), diff --git a/src/emqttd_mgmt.erl b/src/emqttd_mgmt.erl index 56675bbea..a04926102 100644 --- a/src/emqttd_mgmt.erl +++ b/src/emqttd_mgmt.erl @@ -26,6 +26,8 @@ -include_lib("stdlib/include/qlc.hrl"). +-include_lib("emq_dashboard/include/emq_dashboard.hrl"). + -import(proplists, [get_value/2]). -export([brokers/0, broker/1, metrics/0, metrics/1, stats/1, stats/0, @@ -43,14 +45,16 @@ -export([kick_client/1, clean_acl_cache/2]). --export([modify_config/3, modify_config/4, get_configs/0, get_config/1]). +-export([modify_config/2, modify_config/3, modify_config/4, get_configs/0, get_config/1, + get_plugin_config/1, get_plugin_config/2, modify_plugin_config/2, modify_plugin_config/3]). + +-export([add_user/3, check_user/2, user_list/0, lookup_user/1, + update_user/2, change_password/3, remove_user/1]). -define(KB, 1024). -define(MB, (1024*1024)). -define(GB, (1024*1024*1024)). --define(EMPTY_KEY(Key), ((Key == undefined) orelse (Key == <<>>))). - brokers() -> [{Node, broker(Node)} || Node <- ekka_mnesia:running_nodes()]. @@ -235,7 +239,7 @@ subscribe({ClientId, Topic, Qos}) -> {error, format_error(Topic, "validate topic: ${0} fail")} end. -unsubscribe({ClientId, Topic})-> +unsubscribe({ClientId, Topic}) -> case validate(topic, Topic) of true -> case emqttd_sm:lookup_session(ClientId) of @@ -248,43 +252,6 @@ unsubscribe({ClientId, Topic})-> false -> {error, format_error(Topic, "validate topic: ${0} fail")} end. - -% publish(Messages) -> -% lists:foldl( -% fun({ClientId, Topic, Payload, Qos, Retain}, {Success, Failed}) -> -% case validate(topic, Topic) of -% true -> -% Msg = emqttd_message:make(ClientId, Qos, Topic, Payload), -% emqttd:publish(Msg#mqtt_message{retain = Retain}), -% {[[{topic, Topic}]| Success], Failed}; -% false -> -% {Success, [[{topic, Topic}]| Failed]} -% end -% end, {[], []}, Messages). - -% subscribers(Subscribers) -> -% lists:foldl( -% fun({ClientId, Topic, Qos}, {Success, Failed}) -> -% case emqttd_sm:lookup_session(ClientId) of -% undefined -> -% {Success, [[{client_id, ClientId}]|Failed]}; -% #mqtt_session{sess_pid = SessPid} -> -% emqttd_session:subscribe(SessPid, [{Topic, [{qos, Qos}]}]), -% {[[{client_id, ClientId}]| Success], Failed} -% end -% end,{[], []}, Subscribers). - -% unsubscribers(UnSubscribers)-> -% lists:foldl( -% fun({ClientId, Topic}, {Success, Failed}) -> -% case emqttd_sm:lookup_session(ClientId) of -% undefined -> -% {Success, [[{client_id, ClientId}]|Failed]}; -% #mqtt_session{sess_pid = SessPid} -> -% emqttd_session:unsubscriber(SessPid, [{Topic, []}]), -% {[[{client_id, ClientId}]| Success], Failed} -% end -% end, {[], []}, UnSubscribers). %%-------------------------------------------------------------------- %% manager API @@ -317,6 +284,9 @@ clean_acl_cache(Node, ClientId, Topic) -> %%-------------------------------------------------------------------- %% Config ENV %%-------------------------------------------------------------------- +modify_config(App, Terms) -> + emqttd_config:write(App, Terms). + modify_config(App, Key, Value) -> Result = [modify_config(Node, App, Key, Value) || Node <- ekka_mnesia:running_nodes()], lists:any(fun(Item) -> Item =:= ok end, Result). @@ -334,6 +304,103 @@ get_config(Node) when Node =:= node()-> get_config(Node) -> rpc_call(Node, get_config, [Node]). +get_plugin_config(PluginName) -> + emqttd_config:read(PluginName). +get_plugin_config(Node, PluginName) -> + rpc_call(Node, get_plugin_config, [PluginName]). + +modify_plugin_config(PluginName, Terms) -> + emqttd_config:write(PluginName, Terms). +modify_plugin_config(Node, PluginName, Terms) -> + rpc_call(Node, modify_plugin_config, [PluginName, Terms]). + +%%-------------------------------------------------------------------- +%% manager user API +%%-------------------------------------------------------------------- +check_user(undefined, _) -> + {error, "Username undefined"}; +check_user(_, undefined) -> + {error, "Password undefined"}; +check_user(Username, Password) -> + case mnesia:dirty_read(mqtt_admin, Username) of + [#mqtt_admin{password = <>}] -> + case Hash =:= md5_hash(Salt, Password) of + true -> ok; + false -> {error, "Password error"} + end; + [] -> + {error, "User not found"} + end. + +add_user(Username, Password, Tag) -> + Admin = #mqtt_admin{username = Username, + password = hash(Password), + tags = Tag}, + return(mnesia:transaction(fun add_user_/1, [Admin])). + +add_user_(Admin = #mqtt_admin{username = Username}) -> + case mnesia:wread({mqtt_admin, Username}) of + [] -> mnesia:write(Admin); + [_] -> {error, [{code, ?ERROR13}, {message, <<"User already exist">>}]} + end. + +user_list() -> + [row(Admin) || Admin <- ets:tab2list(mqtt_admin)]. + +lookup_user(Username) -> + Admin = mnesia:dirty_read(mqtt_admin, Username), + row(Admin). + +update_user(Username, Params) -> + case mnesia:dirty_read({mqtt_admin, Username}) of + [] -> + {error, [{code, ?ERROR5}, {message, <<"User not found">>}]}; + [User] -> + Admin = case proplists:get_value(<<"tags">>, Params) of + undefined -> User; + Tag -> User#mqtt_admin{tags = Tag} + end, + return(mnesia:transaction(fun() -> mnesia:write(Admin) end)) + end. + +remove_user(Username) -> + Trans = fun() -> + case lookup_user(Username) of + [] -> {error, [{code, ?ERROR5}, {message, <<"User not found">>}]}; + _ -> mnesia:delete({mqtt_admin, Username}) + end + end, + return(mnesia:transaction(Trans)). + +change_password(Username, OldPwd, NewPwd) -> + Trans = fun() -> + case mnesia:wread({mqtt_admin, Username}) of + [Admin = #mqtt_admin{password = <>}] -> + case Hash =:= md5_hash(Salt, OldPwd) of + true -> + mnesia:write(Admin#mqtt_admin{password = hash(NewPwd)}); + false -> + {error, [{code, ?ERROR14}, {message, <<"OldPassword error">>}]} + end; + [] -> + {error, [{code, ?ERROR5}, {message, <<"User not found">>}]} + end + end, + return(mnesia:transaction(Trans)). + +return({atomic, ok}) -> + ok; +return({atomic, Error}) -> + Error; +return({aborted, Reason}) -> + lager:error("Mnesia Transaction error:~p~n", [Reason]), + error. + +row(#mqtt_admin{username = Username, tags = Tags}) -> + [{username, Username}, {tags, Tags}]; +row([#mqtt_admin{username = Username, tags = Tags}]) -> + [{username, Username}, {tags, Tags}]; +row([]) ->[]. %%-------------------------------------------------------------------- %% Internel Functions. %%-------------------------------------------------------------------- @@ -413,3 +480,17 @@ tables() -> format_error(Val, Msg) -> re:replace(Msg, <<"\\$\\{[^}]+\\}">>, Val, [global, {return, binary}]). +hash(Password) -> + SaltBin = salt(), + <>. + +md5_hash(SaltBin, Password) -> + erlang:md5(<>). + +salt() -> + seed(), + Salt = rand:uniform(16#ffffffff), + <>. + +seed() -> + rand:seed(exsplus, erlang:timestamp()). diff --git a/src/emqttd_rest_api.erl b/src/emqttd_rest_api.erl index adb6e5af6..5d3f3a2fd 100644 --- a/src/emqttd_rest_api.erl +++ b/src/emqttd_rest_api.erl @@ -60,6 +60,23 @@ -http_api({"^configs/?$", 'GET', config_list, []}). -http_api({"^nodes/(.+?)/configs/(.+?)/?$", 'PUT', modify_config, [{<<"key">>, binary}, {<<"value">>, binary}]}). -http_api({"^nodes/(.+?)/configs/?$", 'GET', config_list, []}). +-http_api({"^nodes/(.+?)/plugin_configs/(.+?)/?$", 'GET', plugin_config_list, []}). +-http_api({"^nodes/(.+?)/plugin_configs/(.+?)/?$", 'PUT', modify_plugin_config, []}). + +-http_api({"^users/?$", 'GET', users, []}). +-http_api({"^users/?$", 'POST', users, [{<<"username">>, binary}, + {<<"password">>, binary}, + {<<"tag">>, binary}]}). +-http_api({"^users/(.+?)/?$", 'GET', users, []}). +-http_api({"^users/(.+?)/?$", 'PUT', users, []}). +-http_api({"^users/(.+?)/?$", 'DELETE', users, []}). + +-http_api({"^auth/?$", 'POST', auth, [{<<"username">>, binary}, {<<"password">>, binary}]}). +-http_api({"^change_pwd/(.+?)/?$", 'PUT', change_pwd, [{<<"old_pwd">>, binary}, + {<<"new_pwd">>, binary}]}). + +-import(proplists, [get_value/2, get_value/3]). + -export([alarm_list/3]). -export([client/3, client_list/3, client_list/4, kick_client/3, clean_acl_cache/3]). -export([route/3, route_list/2]). @@ -68,7 +85,10 @@ -export([nodes/2, node/3, brokers/2, broker/3, listeners/2, listener/3, metrics/2, metric/3, stats/2, stat/3]). -export([publish/2, subscribe/2, unsubscribe/2]). -export([plugin_list/3, enabled/4]). --export([modify_config/3, modify_config/4, config_list/2, config_list/3]). +-export([modify_config/3, modify_config/4, config_list/2, config_list/3, + plugin_config_list/4, modify_plugin_config/4]). + +-export([users/2,users/3, auth/2, change_pwd/3]). %%-------------------------------------------------------------------------- %% alarm @@ -98,9 +118,9 @@ client('GET', _Params, Key) -> client_list('GET', Params, Node) -> {PageNo, PageSize} = page_params(Params), Data = emqttd_mgmt:client_list(l2a(Node), undefined, PageNo, PageSize), - Rows = proplists:get_value(result, Data), - TotalPage = proplists:get_value(totalPage, Data), - TotalNum = proplists:get_value(totalNum, Data), + Rows = get_value(result, Data), + TotalPage = get_value(totalPage, Data), + TotalNum = get_value(totalNum, Data), {ok, [{current_page, PageNo}, {page_size, PageSize}, {total_num, TotalNum}, @@ -119,7 +139,7 @@ kick_client('DELETE', _Params, Key) -> end. clean_acl_cache('PUT', Params, Key) -> - Topic = proplists:get_value(<<"topic">>, Params), + Topic = get_value(<<"topic">>, Params), case emqttd_mgmt:clean_acl_cache(l2b(Key), Topic) of true -> {ok, []}; false -> {error, [{code, ?ERROR12}]} @@ -151,9 +171,9 @@ route('GET', _Params, Key) -> route_list('GET', Params) -> {PageNo, PageSize} = page_params(Params), Data = emqttd_mgmt:route_list(undefined, PageNo, PageSize), - Rows = proplists:get_value(result, Data), - TotalPage = proplists:get_value(totalPage, Data), - TotalNum = proplists:get_value(totalNum, Data), + Rows = get_value(result, Data), + TotalPage = get_value(totalPage, Data), + TotalNum = get_value(totalNum, Data), {ok, [{current_page, PageNo}, {page_size, PageSize}, {total_num, TotalNum}, @@ -176,9 +196,9 @@ session('GET', _Params, Key) -> session_list('GET', Params, Node) -> {PageNo, PageSize} = page_params(Params), Data = emqttd_mgmt:session_list(l2a(Node), undefined, PageNo, PageSize), - Rows = proplists:get_value(result, Data), - TotalPage = proplists:get_value(totalPage, Data), - TotalNum = proplists:get_value(totalNum, Data), + Rows = get_value(result, Data), + TotalPage = get_value(totalPage, Data), + TotalNum = get_value(totalNum, Data), {ok, [{current_page, PageNo}, {page_size, PageSize}, {total_num, TotalNum}, @@ -193,7 +213,7 @@ session_list('GET', Params, Node, ClientId) -> session_row({ClientId, _Pid, _Persistent, Session}) -> InfoKeys = [clean_sess, max_inflight, inflight_queue, message_queue, message_dropped, awaiting_rel, awaiting_ack, awaiting_comp, created_at], - [{client_id, ClientId} | [{Key, format(Key, proplists:get_value(Key, Session))} || Key <- InfoKeys]]. + [{client_id, ClientId} | [{Key, format(Key, get_value(Key, Session))} || Key <- InfoKeys]]. %%-------------------------------------------------------------------------- %% subscription @@ -205,9 +225,9 @@ subscription('GET', _Params, Key) -> subscription_list('GET', Params, Node) -> {PageNo, PageSize} = page_params(Params), Data = emqttd_mgmt:subscription_list(l2a(Node), undefined, PageNo, PageSize), - Rows = proplists:get_value(result, Data), - TotalPage = proplists:get_value(totalPage, Data), - TotalNum = proplists:get_value(totalNum, Data), + Rows = get_value(result, Data), + TotalPage = get_value(totalPage, Data), + TotalNum = get_value(totalNum, Data), {ok, [{current_page, PageNo}, {page_size, PageSize}, {total_num, TotalNum}, @@ -222,7 +242,7 @@ subscription_list('GET', Params, Node, Key) -> subscription_row({{Topic, ClientId}, Option}) when is_pid(ClientId) -> subscription_row({{Topic, l2b(pid_to_list(ClientId))}, Option}); subscription_row({{Topic, ClientId}, Option}) -> - Qos = proplists:get_value(qos, Option), + Qos = get_value(qos, Option), [{client_id, ClientId}, {topic, Topic}, {qos, Qos}]. %%-------------------------------------------------------------------------- @@ -246,7 +266,7 @@ broker('GET', _Params, Node) -> listeners('GET', _Params) -> Data = emqttd_mgmt:listeners(), - {ok, [{Node, format_listeners(Listeners, [])} || {Node, Listeners} <- Data]}. + {ok, [[{Node, format_listeners(Listeners, [])} || {Node, Listeners} <- Data]]}. listener('GET', _Params, Node) -> Data = emqttd_mgmt:listener(l2a(Node)), @@ -254,7 +274,7 @@ listener('GET', _Params, Node) -> metrics('GET', _Params) -> Data = emqttd_mgmt:metrics(), - {ok, Data}. + {ok, [Data]}. metric('GET', _Params, Node) -> Data = emqttd_mgmt:metrics(l2a(Node)), @@ -262,7 +282,7 @@ metric('GET', _Params, Node) -> stats('GET', _Params) -> Data = emqttd_mgmt:stats(), - {ok, Data}. + {ok, [Data]}. stat('GET', _Params, Node) -> Data = emqttd_mgmt:stats(l2a(Node)), @@ -271,19 +291,19 @@ stat('GET', _Params, Node) -> format_broker(Node, Broker) -> OtpRel = "R" ++ erlang:system_info(otp_release) ++ "/" ++ erlang:system_info(version), [{name, Node}, - {version, bin(proplists:get_value(version, Broker))}, - {sysdescr, bin(proplists:get_value(sysdescr, Broker))}, - {uptime, bin(proplists:get_value(uptime, Broker))}, - {datetime, bin(proplists:get_value(datetime, Broker))}, + {version, bin(get_value(version, Broker))}, + {sysdescr, bin(get_value(sysdescr, Broker))}, + {uptime, bin(get_value(uptime, Broker))}, + {datetime, bin(get_value(datetime, Broker))}, {otp_release, l2b(OtpRel)}, {node_status, 'Running'}]. format_broker(Broker) -> OtpRel = "R" ++ erlang:system_info(otp_release) ++ "/" ++ erlang:system_info(version), - [{version, bin(proplists:get_value(version, Broker))}, - {sysdescr, bin(proplists:get_value(sysdescr, Broker))}, - {uptime, bin(proplists:get_value(uptime, Broker))}, - {datetime, bin(proplists:get_value(datetime, Broker))}, + [{version, bin(get_value(version, Broker))}, + {sysdescr, bin(get_value(sysdescr, Broker))}, + {uptime, bin(get_value(uptime, Broker))}, + {datetime, bin(get_value(datetime, Broker))}, {otp_release, l2b(OtpRel)}, {node_status, 'Running'}]. @@ -300,11 +320,11 @@ format_listener({Protocol, ListenOn, Info}) -> %% mqtt %%-------------------------------------------------------------------------- publish('POST', Params) -> - Topic = proplists:get_value(<<"topic">>, Params), - ClientId = proplists:get_value(<<"client_id">>, Params, http), - Payload = proplists:get_value(<<"payload">>, Params, <<>>), - Qos = proplists:get_value(<<"qos">>, Params, 0), - Retain = proplists:get_value(<<"retain">>, Params, false), + Topic = get_value(<<"topic">>, Params), + ClientId = get_value(<<"client_id">>, Params, http), + Payload = get_value(<<"payload">>, Params, <<>>), + Qos = get_value(<<"qos">>, Params, 0), + Retain = get_value(<<"retain">>, Params, false), case emqttd_mgmt:publish({ClientId, Topic, Payload, Qos, Retain}) of ok -> {ok, []}; @@ -313,9 +333,9 @@ publish('POST', Params) -> end. subscribe('POST', Params) -> - ClientId = proplists:get_value(<<"client_id">>, Params), - Topic = proplists:get_value(<<"topic">>, Params), - Qos = proplists:get_value(<<"qos">>, Params, 0), + ClientId = get_value(<<"client_id">>, Params), + Topic = get_value(<<"topic">>, Params), + Qos = get_value(<<"qos">>, Params, 0), case emqttd_mgmt:subscribe({ClientId, Topic, Qos}) of ok -> {ok, []}; @@ -324,8 +344,8 @@ subscribe('POST', Params) -> end. unsubscribe('POST', Params) -> - ClientId = proplists:get_value(<<"client_id">>, Params), - Topic = proplists:get_value(<<"topic">>, Params), + ClientId = get_value(<<"client_id">>, Params), + Topic = get_value(<<"topic">>, Params), case emqttd_mgmt:unsubscribe({ClientId, Topic})of ok -> {ok, []}; @@ -341,7 +361,7 @@ plugin_list('GET', _Params, Node) -> {ok, Plugins}. enabled('PUT', Params, Node, PluginName) -> - Active = proplists:get_value(<<"active">>, Params), + Active = get_value(<<"active">>, Params), case Active of true -> return(emqttd_mgmt:plugin_load(l2a(Node), l2a(PluginName))); @@ -351,6 +371,8 @@ enabled('PUT', Params, Node, PluginName) -> return(Result) -> case Result of + ok -> + {ok, []}; {ok, _} -> {ok, []}; {error, already_started} -> @@ -372,19 +394,19 @@ plugin(#mqtt_plugin{name = Name, version = Ver, descr = Descr, %% modify config %%-------------------------------------------------------------------------- modify_config('PUT', Params, App) -> - Key = proplists:get_value(<<"key">>, Params, <<"">>), - Value = proplists:get_value(<<"value">>, Params, <<"">>), + Key = get_value(<<"key">>, Params, <<"">>), + Value = get_value(<<"value">>, Params, <<"">>), case emqttd_mgmt:modify_config(l2a(App), b2l(Key), b2l(Value)) of true -> {ok, []}; - false -> {error, [{code, ?ERROR13}]} + false -> {error, [{code, ?ERROR2}]} end. modify_config('PUT', Params, Node, App) -> - Key = proplists:get_value(<<"key">>, Params, <<"">>), - Value = proplists:get_value(<<"value">>, Params, <<"">>), + Key = get_value(<<"key">>, Params, <<"">>), + Value = get_value(<<"value">>, Params, <<"">>), case emqttd_mgmt:modify_config(l2a(Node), l2a(App), b2l(Key), b2l(Value)) of ok -> {ok, []}; - _ -> {error, [{code, ?ERROR13}]} + _ -> {error, [{code, ?ERROR2}]} end. config_list('GET', _Params) -> @@ -395,6 +417,30 @@ config_list('GET', _Params, Node) -> Data = emqttd_mgmt:get_config(l2a(Node)), {ok, [format_config(Config) || Config <- lists:reverse(Data)]}. +plugin_config_list('GET', _Params, Node, App) -> + {ok, Data} = emqttd_mgmt:get_plugin_config(l2a(Node), l2a(App)), + {ok, [format_plugin_config(Config) || Config <- lists:reverse(Data)]}. + +modify_plugin_config('PUT', Params, Node, App) -> + PluginName = l2a(App), + case emqttd_mgmt:modify_plugin_config(l2a(Node), PluginName, Params) of + ok -> + Plugins = emqttd_plugins:list(), + {_, _, _, _, Status} = lists:keyfind(PluginName, 2, Plugins), + case Status of + true -> + emqttd_plugins:unload(PluginName), + timer:sleep(500), + emqttd_plugins:load(PluginName), + {ok, []}; + false -> + {ok, []} + end; + _ -> + {error, [{code, ?ERROR2}]} + end. + + format_config([], Acc) -> Acc; format_config([{Key, Value, Datatpye, App}| Configs], Acc) -> @@ -406,6 +452,53 @@ format_config({Key, Value, Datatpye, App}) -> {<<"datatpye">>, l2b(Datatpye)}, {<<"app">>, App}]. +format_plugin_config({Key, Value, Desc, Required}) -> + [{<<"key">>, l2b(Key)}, + {<<"value">>, l2b(Value)}, + {<<"desc">>, l2b(Desc)}, + {<<"required">>, Required}]. + +%%-------------------------------------------------------------------------- +%% Admin +%%-------------------------------------------------------------------------- +auth('POST', Params) -> + Username = get_value(<<"username">>, Params), + Password = get_value(<<"password">>, Params), + case emqttd_mgmt:check_user(Username, Password) of + ok -> + {ok, []}; + {error, Reason} -> + {error, [{code, ?ERROR3}, {message, list_to_binary(Reason)}]} + end. + +users('POST', Params) -> + Username = get_value(<<"username">>, Params), + Password = get_value(<<"password">>, Params), + Tag = get_value(<<"tags">>, Params), + code(emqttd_mgmt:add_user(Username, Password, Tag)); + +users('GET', _Params) -> + {ok, [Admin || Admin <- emqttd_mgmt:user_list()]}. + +users('GET', _Params, Username) -> + {ok, emqttd_mgmt:lookup_user(list_to_binary(Username))}; + +users('PUT', Params, Username) -> + code(emqttd_mgmt:update_user(list_to_binary(Username), Params)); + +users('DELETE', _Params, "admin") -> + {error, [{code, ?ERROR6}, {message, <<"admin cannot be deleted">>}]}; +users('DELETE', _Params, Username) -> + code(emqttd_mgmt:remove_user(list_to_binary(Username))). + +change_pwd('PUT', Params, Username) -> + OldPwd = get_value(<<"old_pwd">>, Params), + NewPwd = get_value(<<"new_pwd">>, Params), + code(emqttd_mgmt:change_password(list_to_binary(Username), OldPwd, NewPwd)). + +code(ok) -> {ok, []}; +code(error) -> {error, [{code, ?ERROR2}]}; +code({error, Error}) -> {error, Error}. %%-------------------------------------------------------------------------- %% Inner function %%-------------------------------------------------------------------------- @@ -446,6 +539,6 @@ b2l(B) -> binary_to_list(B). page_params(Params) -> - PageNo = int(proplists:get_value("curr_page", Params, "1")), - PageSize = int(proplists:get_value("page_size", Params, "20")), + PageNo = int(get_value("curr_page", Params, "1")), + PageSize = int(get_value("page_size", Params, "20")), {PageNo, PageSize}. From 7260b17cb536afda1b4d94aefce040559293464d Mon Sep 17 00:00:00 2001 From: turtled Date: Wed, 6 Sep 2017 17:17:29 +0800 Subject: [PATCH 06/10] Plugins configuration insert to ets table --- src/emqttd_cli_config.erl | 32 ++++++++++++++++++++++++++++++-- src/emqttd_config.erl | 6 +++--- 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/src/emqttd_cli_config.erl b/src/emqttd_cli_config.erl index aca2a45bb..489182fb8 100644 --- a/src/emqttd_cli_config.erl +++ b/src/emqttd_cli_config.erl @@ -22,15 +22,43 @@ set_usage/0, all_cfgs/0, get_cfg/2, - get_cfg/3]). + get_cfg/3, + read_config/1, + write_config/2]). -define(APP, emqttd). +-define(TAB, emqttd_config). register_config() -> application:start(clique), F = fun() -> ekka_mnesia:running_nodes() end, clique:register_node_finder(F), - register_config_cli(). + register_config_cli(), + create_config_tab(). + +create_config_tab() -> + case ets:info(?TAB, name) of + undefined -> + ets:new(?TAB, [named_table, public]), + {ok, PluginsEtcDir} = emqttd:env(plugins_etc_dir), + Files = filelib:wildcard("*.conf", PluginsEtcDir), + lists:foreach(fun(File) -> + [FileName, _] = string:split(File, "."), + Configs = cuttlefish_conf:file(lists:concat([PluginsEtcDir, File])), + ets:insert(?TAB, {list_to_atom(FileName), Configs}) + end, Files); + _ -> + ok + end. + +read_config(App) -> + case ets:lookup(?TAB, App) of + [] -> []; + [{_, Value}] -> Value + end. + +write_config(App, Terms) -> + ets:insert(?TAB, {App, Terms}). run(Cmd) -> clique:run(Cmd). diff --git a/src/emqttd_config.erl b/src/emqttd_config.erl index cae8ad33c..50561bdee 100644 --- a/src/emqttd_config.erl +++ b/src/emqttd_config.erl @@ -57,6 +57,7 @@ write(App, Terms) -> Schema = cuttlefish_schema:files([Path]), case cuttlefish_generator:map(Schema, Configs) of [{App, Configs1}] -> + emqttd_cli_config:write_config(App, Configs), lists:foreach(fun({Key, Val}) -> application:set_env(App, Key, Val) end, Configs1); _ -> error @@ -87,9 +88,8 @@ get(App, Par, Def) -> read_(App) -> - {ok, PluginsEtcDir} = emqttd:env(plugins_etc_dir), - Configs = cuttlefish_conf:file(lists:concat([PluginsEtcDir, App, ".conf"])), - Path= lists:concat([code:priv_dir(App), "/", App, ".schema"]), + Configs = emqttd_cli_config:read_config(App), + Path = lists:concat([code:priv_dir(App), "/", App, ".schema"]), {_, Mappings, _} = cuttlefish_schema:files([Path]), OptionalCfg = lists:foldl(fun(Map, Acc) -> Key = cuttlefish_mapping:variable(Map), From 85f9a8cb70fcbc247653bc8707cfb1381f6080ef Mon Sep 17 00:00:00 2001 From: turtled Date: Fri, 8 Sep 2017 23:23:48 +0800 Subject: [PATCH 07/10] Fix read plugin configuration not find schema file --- src/emqttd_config.erl | 39 ++++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/src/emqttd_config.erl b/src/emqttd_config.erl index 50561bdee..deaaa77d1 100644 --- a/src/emqttd_config.erl +++ b/src/emqttd_config.erl @@ -90,20 +90,25 @@ get(App, Par, Def) -> read_(App) -> Configs = emqttd_cli_config:read_config(App), Path = lists:concat([code:priv_dir(App), "/", App, ".schema"]), - {_, Mappings, _} = cuttlefish_schema:files([Path]), - OptionalCfg = lists:foldl(fun(Map, Acc) -> - Key = cuttlefish_mapping:variable(Map), - case proplists:get_value(Key, Configs) of - undefined -> - [{cuttlefish_variable:format(Key), "", cuttlefish_mapping:doc(Map), false} | Acc]; - _ -> Acc - end - end, [], Mappings), - RequiredCfg = lists:foldl(fun({Key, Val}, Acc) -> - case lists:keyfind(Key, 2, Mappings) of - false -> Acc; - Map -> - [{cuttlefish_variable:format(Key), Val, cuttlefish_mapping:doc(Map), true} | Acc] - end - end, [], Configs), - RequiredCfg ++ OptionalCfg. + case filelib:is_file(Path) of + false -> + []; + true -> + {_, Mappings, _} = cuttlefish_schema:files([Path]), + OptionalCfg = lists:foldl(fun(Map, Acc) -> + Key = cuttlefish_mapping:variable(Map), + case proplists:get_value(Key, Configs) of + undefined -> + [{cuttlefish_variable:format(Key), "", cuttlefish_mapping:doc(Map), false} | Acc]; + _ -> Acc + end + end, [], Mappings), + RequiredCfg = lists:foldl(fun({Key, Val}, Acc) -> + case lists:keyfind(Key, 2, Mappings) of + false -> Acc; + Map -> + [{cuttlefish_variable:format(Key), Val, cuttlefish_mapping:doc(Map), true} | Acc] + end + end, [], Configs), + RequiredCfg ++ OptionalCfg + end. From 8091c07d3cef03d3cd8795e9594630c75e5d993d Mon Sep 17 00:00:00 2001 From: turtled Date: Sat, 9 Sep 2017 20:55:57 +0800 Subject: [PATCH 08/10] Review code --- src/emqttd_cli_config.erl | 2 +- src/emqttd_http.erl | 2 +- src/emqttd_rest_api.erl | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/emqttd_cli_config.erl b/src/emqttd_cli_config.erl index 489182fb8..4379200b1 100644 --- a/src/emqttd_cli_config.erl +++ b/src/emqttd_cli_config.erl @@ -367,4 +367,4 @@ any_to_string(A) when is_atom(A) -> any_to_string(B) when is_binary(B) -> binary_to_list(B); any_to_string(L) when is_list(L) -> - L. \ No newline at end of file + L. diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index c1b255a40..bc5ba764d 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -214,7 +214,7 @@ api_list() -> <<"api/v2/nodes/{node_name}/clients">>, <<"api/v2/nodes/{node_name}/clients/{clientid}">>, <<"api/v2/clients/{clientid}">>, - <<"api/v2/clean_acl_cache/{clientid}">>, + <<"api/v2/clients/{clientid}/clean_acl_cache">>, <<"api/v2/nodes/{node_name}/sessions">>, <<"api/v2/nodes/{node_name}/sessions/{clientid}">>, <<"api/v2/sessions/{clientid}">>, diff --git a/src/emqttd_rest_api.erl b/src/emqttd_rest_api.erl index 5d3f3a2fd..41c689cab 100644 --- a/src/emqttd_rest_api.erl +++ b/src/emqttd_rest_api.erl @@ -25,7 +25,7 @@ -http_api({"^nodes/(.+?)/clients/(.+?)/?$", 'GET',client_list, []}). -http_api({"^clients/(.+?)/?$", 'GET', client, []}). -http_api({"^clients/(.+?)/?$", 'DELETE', kick_client, []}). --http_api({"^clean_acl_cache/(.+?)/?$", 'DELETE', clean_acl_cache, [{<<"topic">>, binary}]}). +-http_api({"^clients/(.+?)/clean_acl_cache?$", 'DELETE', clean_acl_cache, [{<<"topic">>, binary}]}). -http_api({"^routes?$", 'GET', route_list, []}). -http_api({"^routes/(.+?)/?$", 'GET', route, []}). From 424aea2878512174a5de5190d1e84a7f18234718 Mon Sep 17 00:00:00 2001 From: turtled Date: Sat, 9 Sep 2017 21:00:17 +0800 Subject: [PATCH 09/10] Review code --- src/emqttd_mgmt.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/emqttd_mgmt.erl b/src/emqttd_mgmt.erl index a04926102..2c671f3bd 100644 --- a/src/emqttd_mgmt.erl +++ b/src/emqttd_mgmt.erl @@ -26,7 +26,9 @@ -include_lib("stdlib/include/qlc.hrl"). --include_lib("emq_dashboard/include/emq_dashboard.hrl"). +-record(mqtt_admin, {username, password, tags}). + +-define(EMPTY_KEY(Key), ((Key == undefined) orelse (Key == <<>>))). -import(proplists, [get_value/2]). From ee9fc5cb0e6b11f399e496ef16e129add8e402dc Mon Sep 17 00:00:00 2001 From: turtled Date: Sat, 9 Sep 2017 21:21:30 +0800 Subject: [PATCH 10/10] Review code --- src/emqttd_cli_config.erl | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/src/emqttd_cli_config.erl b/src/emqttd_cli_config.erl index 4379200b1..332ef9a9d 100644 --- a/src/emqttd_cli_config.erl +++ b/src/emqttd_cli_config.erl @@ -299,19 +299,11 @@ queue_config_callback([_, AppStr, KeyStr], Value) -> queue_config_callback(App, low_watermark, Value) -> {ok, Env} = emqttd:env(App), - Parse = fun(S) -> - {match, [N]} = re:run(S, "^([0-9]+)%$", [{capture, all_but_first, list}]), - list_to_integer(N) / 100 - end, - application:set_env(?APP, App, lists:keyreplace(low_watermark, 1, Env, {low_watermark, Parse(Value)})), + application:set_env(?APP, App, lists:keyreplace(low_watermark, 1, Env, {low_watermark, Value})), " successfully\n"; queue_config_callback(App, high_watermark, Value) -> {ok, Env} = emqttd:env(App), - Parse = fun(S) -> - {match, [N]} = re:run(S, "^([0-9]+)%$", [{capture, all_but_first, list}]), - list_to_integer(N) / 100 - end, - application:set_env(?APP, App, lists:keyreplace(high_watermark, 1, Env, {high_watermark, Parse(Value)})), + application:set_env(?APP, App, lists:keyreplace(high_watermark, 1, Env, {high_watermark, Value})), " successfully\n"; queue_config_callback(App, Key, Value) -> {ok, Env} = emqttd:env(App),