diff --git a/include/emqttd_internal.hrl b/include/emqttd_internal.hrl index 06398e31f..343be68e4 100644 --- a/include/emqttd_internal.hrl +++ b/include/emqttd_internal.hrl @@ -72,5 +72,6 @@ -define(ERROR10, 110). %% Plugin has been loaded -define(ERROR11, 111). %% Plugin has been loaded -define(ERROR12, 112). %% Client not online --define(ERROR13, 113). %% Modify config fail +-define(ERROR13, 113). %% User already exist +-define(ERROR14, 114). %% OldPassword error diff --git a/priv/emq.schema b/priv/emq.schema index bd533d05e..ce4baf36b 100644 --- a/priv/emq.schema +++ b/priv/emq.schema @@ -624,13 +624,13 @@ end}. %% @doc Low-water mark of queued messages {mapping, "mqtt.mqueue.low_watermark", "emqttd.mqueue", [ {default, "20%"}, - {datatype, string} + {datatype, {percent, float}} ]}. %% @doc High-water mark of queued messages {mapping, "mqtt.mqueue.high_watermark", "emqttd.mqueue", [ {default, "60%"}, - {datatype, string} + {datatype, {percent, float}} ]}. %% @doc Queue Qos0 messages? @@ -640,14 +640,10 @@ end}. ]}. {translation, "emqttd.mqueue", fun(Conf) -> - Parse = fun(S) -> - {match, [N]} = re:run(S, "^([0-9]+)%$", [{capture, all_but_first, list}]), - list_to_integer(N) / 100 - end, Opts = [{type, cuttlefish:conf_get("mqtt.mqueue.type", Conf, simple)}, {max_length, cuttlefish:conf_get("mqtt.mqueue.max_length", Conf)}, - {low_watermark, Parse(cuttlefish:conf_get("mqtt.mqueue.low_watermark", Conf))}, - {high_watermark, Parse(cuttlefish:conf_get("mqtt.mqueue.high_watermark", Conf))}, + {low_watermark, cuttlefish:conf_get("mqtt.mqueue.low_watermark", Conf)}, + {high_watermark, cuttlefish:conf_get("mqtt.mqueue.high_watermark", Conf)}, {store_qos0, cuttlefish:conf_get("mqtt.mqueue.store_qos0", Conf)}], case cuttlefish:conf_get("mqtt.mqueue.priority", Conf) of undefined -> Opts; diff --git a/src/emqttd_config.erl b/src/emqttd_config.erl index 77a4e8949..cae8ad33c 100644 --- a/src/emqttd_config.erl +++ b/src/emqttd_config.erl @@ -25,18 +25,19 @@ -module(emqttd_config). --export([read/1, dump/2, reload/1, get/2, get/3, set/3]). +-export([read/1, write/2, dump/2, reload/1, get/2, get/3, set/3]). -type(env() :: {atom(), term()}). %% @doc Read the configuration of an application. -spec(read(atom()) -> {ok, list(env())} | {error, term()}). -read(_App) -> +read(App) -> %% TODO: %% 1. Read the app.conf from etc folder %% 2. Cuttlefish to read the conf %% 3. Return the terms and schema - {error, unsupported}. + % {error, unsupported}. + {ok, read_(App)}. %% @doc Reload configuration of an application. -spec(reload(atom()) -> ok | {error, term()}). @@ -47,6 +48,20 @@ reload(_App) -> %% 3. set/3 to apply the config ok. +-spec(write(atom(), list(env())) -> ok | {error, term()}). +write(App, Terms) -> + Configs = lists:map(fun({Key, Val}) -> + {cuttlefish_variable:tokenize(binary_to_list(Key)), binary_to_list(Val)} + end, Terms), + Path = lists:concat([code:priv_dir(App), "/", App, ".schema"]), + Schema = cuttlefish_schema:files([Path]), + case cuttlefish_generator:map(Schema, Configs) of + [{App, Configs1}] -> + lists:foreach(fun({Key, Val}) -> application:set_env(App, Key, Val) end, Configs1); + _ -> + error + end. + -spec(dump(atom(), list(env())) -> ok | {error, term()}). dump(_App, _Terms) -> %% TODO @@ -70,3 +85,25 @@ get(App, Par) -> get(App, Par, Def) -> emqttd_cli_config:get_cfg(App, Par, Def). + +read_(App) -> + {ok, PluginsEtcDir} = emqttd:env(plugins_etc_dir), + Configs = cuttlefish_conf:file(lists:concat([PluginsEtcDir, App, ".conf"])), + Path= lists:concat([code:priv_dir(App), "/", App, ".schema"]), + {_, Mappings, _} = cuttlefish_schema:files([Path]), + OptionalCfg = lists:foldl(fun(Map, Acc) -> + Key = cuttlefish_mapping:variable(Map), + case proplists:get_value(Key, Configs) of + undefined -> + [{cuttlefish_variable:format(Key), "", cuttlefish_mapping:doc(Map), false} | Acc]; + _ -> Acc + end + end, [], Mappings), + RequiredCfg = lists:foldl(fun({Key, Val}, Acc) -> + case lists:keyfind(Key, 2, Mappings) of + false -> Acc; + Map -> + [{cuttlefish_variable:format(Key), Val, cuttlefish_mapping:doc(Map), true} | Acc] + end + end, [], Configs), + RequiredCfg ++ OptionalCfg. diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index 4846bdb50..c1b255a40 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -51,8 +51,15 @@ handle_request(Req, State) -> handle_request("/status", Req, Req:get(method)); "/" -> handle_request("/", Req, Req:get(method)); + "/api/v2/auth" -> + handle_request(Path, Req, State); _ -> - if_authorized(Req, fun() -> handle_request(Path, Req, State) end) + Host = Req:get_header_value("Host"), + [_, Port] = string:tokens(Host, ":"), + case Port of + "18083" -> handle_request(Path, Req, State); + _ -> if_authorized(Req, fun() -> handle_request(Path, Req, State) end) + end end. handle_request("/api/v2/" ++ Url, Req, #state{dispatch = Dispatch}) -> @@ -125,14 +132,8 @@ authorized(Req) -> false; "Basic " ++ BasicAuth -> {Username, Password} = user_passwd(BasicAuth), - {ok, Peer} = Req:get(peername), - Params = params(Req), - ClientId = get_value(<<"client">>, Params, http), - case emqttd_access_control:auth(#mqtt_client{client_id = ClientId, - username = Username, - peername = Peer}, Password) of - ok -> true; - {ok, _IsSuper} -> + case emqttd_mgmt:check_user(Username, Password) of + ok -> true; {error, Reason} -> lager:error("HTTP Auth failure: username=~s, reason=~p", [Username, Reason]), diff --git a/src/emqttd_mgmt.erl b/src/emqttd_mgmt.erl index 56675bbea..a04926102 100644 --- a/src/emqttd_mgmt.erl +++ b/src/emqttd_mgmt.erl @@ -26,6 +26,8 @@ -include_lib("stdlib/include/qlc.hrl"). +-include_lib("emq_dashboard/include/emq_dashboard.hrl"). + -import(proplists, [get_value/2]). -export([brokers/0, broker/1, metrics/0, metrics/1, stats/1, stats/0, @@ -43,14 +45,16 @@ -export([kick_client/1, clean_acl_cache/2]). --export([modify_config/3, modify_config/4, get_configs/0, get_config/1]). +-export([modify_config/2, modify_config/3, modify_config/4, get_configs/0, get_config/1, + get_plugin_config/1, get_plugin_config/2, modify_plugin_config/2, modify_plugin_config/3]). + +-export([add_user/3, check_user/2, user_list/0, lookup_user/1, + update_user/2, change_password/3, remove_user/1]). -define(KB, 1024). -define(MB, (1024*1024)). -define(GB, (1024*1024*1024)). --define(EMPTY_KEY(Key), ((Key == undefined) orelse (Key == <<>>))). - brokers() -> [{Node, broker(Node)} || Node <- ekka_mnesia:running_nodes()]. @@ -235,7 +239,7 @@ subscribe({ClientId, Topic, Qos}) -> {error, format_error(Topic, "validate topic: ${0} fail")} end. -unsubscribe({ClientId, Topic})-> +unsubscribe({ClientId, Topic}) -> case validate(topic, Topic) of true -> case emqttd_sm:lookup_session(ClientId) of @@ -248,43 +252,6 @@ unsubscribe({ClientId, Topic})-> false -> {error, format_error(Topic, "validate topic: ${0} fail")} end. - -% publish(Messages) -> -% lists:foldl( -% fun({ClientId, Topic, Payload, Qos, Retain}, {Success, Failed}) -> -% case validate(topic, Topic) of -% true -> -% Msg = emqttd_message:make(ClientId, Qos, Topic, Payload), -% emqttd:publish(Msg#mqtt_message{retain = Retain}), -% {[[{topic, Topic}]| Success], Failed}; -% false -> -% {Success, [[{topic, Topic}]| Failed]} -% end -% end, {[], []}, Messages). - -% subscribers(Subscribers) -> -% lists:foldl( -% fun({ClientId, Topic, Qos}, {Success, Failed}) -> -% case emqttd_sm:lookup_session(ClientId) of -% undefined -> -% {Success, [[{client_id, ClientId}]|Failed]}; -% #mqtt_session{sess_pid = SessPid} -> -% emqttd_session:subscribe(SessPid, [{Topic, [{qos, Qos}]}]), -% {[[{client_id, ClientId}]| Success], Failed} -% end -% end,{[], []}, Subscribers). - -% unsubscribers(UnSubscribers)-> -% lists:foldl( -% fun({ClientId, Topic}, {Success, Failed}) -> -% case emqttd_sm:lookup_session(ClientId) of -% undefined -> -% {Success, [[{client_id, ClientId}]|Failed]}; -% #mqtt_session{sess_pid = SessPid} -> -% emqttd_session:unsubscriber(SessPid, [{Topic, []}]), -% {[[{client_id, ClientId}]| Success], Failed} -% end -% end, {[], []}, UnSubscribers). %%-------------------------------------------------------------------- %% manager API @@ -317,6 +284,9 @@ clean_acl_cache(Node, ClientId, Topic) -> %%-------------------------------------------------------------------- %% Config ENV %%-------------------------------------------------------------------- +modify_config(App, Terms) -> + emqttd_config:write(App, Terms). + modify_config(App, Key, Value) -> Result = [modify_config(Node, App, Key, Value) || Node <- ekka_mnesia:running_nodes()], lists:any(fun(Item) -> Item =:= ok end, Result). @@ -334,6 +304,103 @@ get_config(Node) when Node =:= node()-> get_config(Node) -> rpc_call(Node, get_config, [Node]). +get_plugin_config(PluginName) -> + emqttd_config:read(PluginName). +get_plugin_config(Node, PluginName) -> + rpc_call(Node, get_plugin_config, [PluginName]). + +modify_plugin_config(PluginName, Terms) -> + emqttd_config:write(PluginName, Terms). +modify_plugin_config(Node, PluginName, Terms) -> + rpc_call(Node, modify_plugin_config, [PluginName, Terms]). + +%%-------------------------------------------------------------------- +%% manager user API +%%-------------------------------------------------------------------- +check_user(undefined, _) -> + {error, "Username undefined"}; +check_user(_, undefined) -> + {error, "Password undefined"}; +check_user(Username, Password) -> + case mnesia:dirty_read(mqtt_admin, Username) of + [#mqtt_admin{password = <>}] -> + case Hash =:= md5_hash(Salt, Password) of + true -> ok; + false -> {error, "Password error"} + end; + [] -> + {error, "User not found"} + end. + +add_user(Username, Password, Tag) -> + Admin = #mqtt_admin{username = Username, + password = hash(Password), + tags = Tag}, + return(mnesia:transaction(fun add_user_/1, [Admin])). + +add_user_(Admin = #mqtt_admin{username = Username}) -> + case mnesia:wread({mqtt_admin, Username}) of + [] -> mnesia:write(Admin); + [_] -> {error, [{code, ?ERROR13}, {message, <<"User already exist">>}]} + end. + +user_list() -> + [row(Admin) || Admin <- ets:tab2list(mqtt_admin)]. + +lookup_user(Username) -> + Admin = mnesia:dirty_read(mqtt_admin, Username), + row(Admin). + +update_user(Username, Params) -> + case mnesia:dirty_read({mqtt_admin, Username}) of + [] -> + {error, [{code, ?ERROR5}, {message, <<"User not found">>}]}; + [User] -> + Admin = case proplists:get_value(<<"tags">>, Params) of + undefined -> User; + Tag -> User#mqtt_admin{tags = Tag} + end, + return(mnesia:transaction(fun() -> mnesia:write(Admin) end)) + end. + +remove_user(Username) -> + Trans = fun() -> + case lookup_user(Username) of + [] -> {error, [{code, ?ERROR5}, {message, <<"User not found">>}]}; + _ -> mnesia:delete({mqtt_admin, Username}) + end + end, + return(mnesia:transaction(Trans)). + +change_password(Username, OldPwd, NewPwd) -> + Trans = fun() -> + case mnesia:wread({mqtt_admin, Username}) of + [Admin = #mqtt_admin{password = <>}] -> + case Hash =:= md5_hash(Salt, OldPwd) of + true -> + mnesia:write(Admin#mqtt_admin{password = hash(NewPwd)}); + false -> + {error, [{code, ?ERROR14}, {message, <<"OldPassword error">>}]} + end; + [] -> + {error, [{code, ?ERROR5}, {message, <<"User not found">>}]} + end + end, + return(mnesia:transaction(Trans)). + +return({atomic, ok}) -> + ok; +return({atomic, Error}) -> + Error; +return({aborted, Reason}) -> + lager:error("Mnesia Transaction error:~p~n", [Reason]), + error. + +row(#mqtt_admin{username = Username, tags = Tags}) -> + [{username, Username}, {tags, Tags}]; +row([#mqtt_admin{username = Username, tags = Tags}]) -> + [{username, Username}, {tags, Tags}]; +row([]) ->[]. %%-------------------------------------------------------------------- %% Internel Functions. %%-------------------------------------------------------------------- @@ -413,3 +480,17 @@ tables() -> format_error(Val, Msg) -> re:replace(Msg, <<"\\$\\{[^}]+\\}">>, Val, [global, {return, binary}]). +hash(Password) -> + SaltBin = salt(), + <>. + +md5_hash(SaltBin, Password) -> + erlang:md5(<>). + +salt() -> + seed(), + Salt = rand:uniform(16#ffffffff), + <>. + +seed() -> + rand:seed(exsplus, erlang:timestamp()). diff --git a/src/emqttd_rest_api.erl b/src/emqttd_rest_api.erl index adb6e5af6..5d3f3a2fd 100644 --- a/src/emqttd_rest_api.erl +++ b/src/emqttd_rest_api.erl @@ -60,6 +60,23 @@ -http_api({"^configs/?$", 'GET', config_list, []}). -http_api({"^nodes/(.+?)/configs/(.+?)/?$", 'PUT', modify_config, [{<<"key">>, binary}, {<<"value">>, binary}]}). -http_api({"^nodes/(.+?)/configs/?$", 'GET', config_list, []}). +-http_api({"^nodes/(.+?)/plugin_configs/(.+?)/?$", 'GET', plugin_config_list, []}). +-http_api({"^nodes/(.+?)/plugin_configs/(.+?)/?$", 'PUT', modify_plugin_config, []}). + +-http_api({"^users/?$", 'GET', users, []}). +-http_api({"^users/?$", 'POST', users, [{<<"username">>, binary}, + {<<"password">>, binary}, + {<<"tag">>, binary}]}). +-http_api({"^users/(.+?)/?$", 'GET', users, []}). +-http_api({"^users/(.+?)/?$", 'PUT', users, []}). +-http_api({"^users/(.+?)/?$", 'DELETE', users, []}). + +-http_api({"^auth/?$", 'POST', auth, [{<<"username">>, binary}, {<<"password">>, binary}]}). +-http_api({"^change_pwd/(.+?)/?$", 'PUT', change_pwd, [{<<"old_pwd">>, binary}, + {<<"new_pwd">>, binary}]}). + +-import(proplists, [get_value/2, get_value/3]). + -export([alarm_list/3]). -export([client/3, client_list/3, client_list/4, kick_client/3, clean_acl_cache/3]). -export([route/3, route_list/2]). @@ -68,7 +85,10 @@ -export([nodes/2, node/3, brokers/2, broker/3, listeners/2, listener/3, metrics/2, metric/3, stats/2, stat/3]). -export([publish/2, subscribe/2, unsubscribe/2]). -export([plugin_list/3, enabled/4]). --export([modify_config/3, modify_config/4, config_list/2, config_list/3]). +-export([modify_config/3, modify_config/4, config_list/2, config_list/3, + plugin_config_list/4, modify_plugin_config/4]). + +-export([users/2,users/3, auth/2, change_pwd/3]). %%-------------------------------------------------------------------------- %% alarm @@ -98,9 +118,9 @@ client('GET', _Params, Key) -> client_list('GET', Params, Node) -> {PageNo, PageSize} = page_params(Params), Data = emqttd_mgmt:client_list(l2a(Node), undefined, PageNo, PageSize), - Rows = proplists:get_value(result, Data), - TotalPage = proplists:get_value(totalPage, Data), - TotalNum = proplists:get_value(totalNum, Data), + Rows = get_value(result, Data), + TotalPage = get_value(totalPage, Data), + TotalNum = get_value(totalNum, Data), {ok, [{current_page, PageNo}, {page_size, PageSize}, {total_num, TotalNum}, @@ -119,7 +139,7 @@ kick_client('DELETE', _Params, Key) -> end. clean_acl_cache('PUT', Params, Key) -> - Topic = proplists:get_value(<<"topic">>, Params), + Topic = get_value(<<"topic">>, Params), case emqttd_mgmt:clean_acl_cache(l2b(Key), Topic) of true -> {ok, []}; false -> {error, [{code, ?ERROR12}]} @@ -151,9 +171,9 @@ route('GET', _Params, Key) -> route_list('GET', Params) -> {PageNo, PageSize} = page_params(Params), Data = emqttd_mgmt:route_list(undefined, PageNo, PageSize), - Rows = proplists:get_value(result, Data), - TotalPage = proplists:get_value(totalPage, Data), - TotalNum = proplists:get_value(totalNum, Data), + Rows = get_value(result, Data), + TotalPage = get_value(totalPage, Data), + TotalNum = get_value(totalNum, Data), {ok, [{current_page, PageNo}, {page_size, PageSize}, {total_num, TotalNum}, @@ -176,9 +196,9 @@ session('GET', _Params, Key) -> session_list('GET', Params, Node) -> {PageNo, PageSize} = page_params(Params), Data = emqttd_mgmt:session_list(l2a(Node), undefined, PageNo, PageSize), - Rows = proplists:get_value(result, Data), - TotalPage = proplists:get_value(totalPage, Data), - TotalNum = proplists:get_value(totalNum, Data), + Rows = get_value(result, Data), + TotalPage = get_value(totalPage, Data), + TotalNum = get_value(totalNum, Data), {ok, [{current_page, PageNo}, {page_size, PageSize}, {total_num, TotalNum}, @@ -193,7 +213,7 @@ session_list('GET', Params, Node, ClientId) -> session_row({ClientId, _Pid, _Persistent, Session}) -> InfoKeys = [clean_sess, max_inflight, inflight_queue, message_queue, message_dropped, awaiting_rel, awaiting_ack, awaiting_comp, created_at], - [{client_id, ClientId} | [{Key, format(Key, proplists:get_value(Key, Session))} || Key <- InfoKeys]]. + [{client_id, ClientId} | [{Key, format(Key, get_value(Key, Session))} || Key <- InfoKeys]]. %%-------------------------------------------------------------------------- %% subscription @@ -205,9 +225,9 @@ subscription('GET', _Params, Key) -> subscription_list('GET', Params, Node) -> {PageNo, PageSize} = page_params(Params), Data = emqttd_mgmt:subscription_list(l2a(Node), undefined, PageNo, PageSize), - Rows = proplists:get_value(result, Data), - TotalPage = proplists:get_value(totalPage, Data), - TotalNum = proplists:get_value(totalNum, Data), + Rows = get_value(result, Data), + TotalPage = get_value(totalPage, Data), + TotalNum = get_value(totalNum, Data), {ok, [{current_page, PageNo}, {page_size, PageSize}, {total_num, TotalNum}, @@ -222,7 +242,7 @@ subscription_list('GET', Params, Node, Key) -> subscription_row({{Topic, ClientId}, Option}) when is_pid(ClientId) -> subscription_row({{Topic, l2b(pid_to_list(ClientId))}, Option}); subscription_row({{Topic, ClientId}, Option}) -> - Qos = proplists:get_value(qos, Option), + Qos = get_value(qos, Option), [{client_id, ClientId}, {topic, Topic}, {qos, Qos}]. %%-------------------------------------------------------------------------- @@ -246,7 +266,7 @@ broker('GET', _Params, Node) -> listeners('GET', _Params) -> Data = emqttd_mgmt:listeners(), - {ok, [{Node, format_listeners(Listeners, [])} || {Node, Listeners} <- Data]}. + {ok, [[{Node, format_listeners(Listeners, [])} || {Node, Listeners} <- Data]]}. listener('GET', _Params, Node) -> Data = emqttd_mgmt:listener(l2a(Node)), @@ -254,7 +274,7 @@ listener('GET', _Params, Node) -> metrics('GET', _Params) -> Data = emqttd_mgmt:metrics(), - {ok, Data}. + {ok, [Data]}. metric('GET', _Params, Node) -> Data = emqttd_mgmt:metrics(l2a(Node)), @@ -262,7 +282,7 @@ metric('GET', _Params, Node) -> stats('GET', _Params) -> Data = emqttd_mgmt:stats(), - {ok, Data}. + {ok, [Data]}. stat('GET', _Params, Node) -> Data = emqttd_mgmt:stats(l2a(Node)), @@ -271,19 +291,19 @@ stat('GET', _Params, Node) -> format_broker(Node, Broker) -> OtpRel = "R" ++ erlang:system_info(otp_release) ++ "/" ++ erlang:system_info(version), [{name, Node}, - {version, bin(proplists:get_value(version, Broker))}, - {sysdescr, bin(proplists:get_value(sysdescr, Broker))}, - {uptime, bin(proplists:get_value(uptime, Broker))}, - {datetime, bin(proplists:get_value(datetime, Broker))}, + {version, bin(get_value(version, Broker))}, + {sysdescr, bin(get_value(sysdescr, Broker))}, + {uptime, bin(get_value(uptime, Broker))}, + {datetime, bin(get_value(datetime, Broker))}, {otp_release, l2b(OtpRel)}, {node_status, 'Running'}]. format_broker(Broker) -> OtpRel = "R" ++ erlang:system_info(otp_release) ++ "/" ++ erlang:system_info(version), - [{version, bin(proplists:get_value(version, Broker))}, - {sysdescr, bin(proplists:get_value(sysdescr, Broker))}, - {uptime, bin(proplists:get_value(uptime, Broker))}, - {datetime, bin(proplists:get_value(datetime, Broker))}, + [{version, bin(get_value(version, Broker))}, + {sysdescr, bin(get_value(sysdescr, Broker))}, + {uptime, bin(get_value(uptime, Broker))}, + {datetime, bin(get_value(datetime, Broker))}, {otp_release, l2b(OtpRel)}, {node_status, 'Running'}]. @@ -300,11 +320,11 @@ format_listener({Protocol, ListenOn, Info}) -> %% mqtt %%-------------------------------------------------------------------------- publish('POST', Params) -> - Topic = proplists:get_value(<<"topic">>, Params), - ClientId = proplists:get_value(<<"client_id">>, Params, http), - Payload = proplists:get_value(<<"payload">>, Params, <<>>), - Qos = proplists:get_value(<<"qos">>, Params, 0), - Retain = proplists:get_value(<<"retain">>, Params, false), + Topic = get_value(<<"topic">>, Params), + ClientId = get_value(<<"client_id">>, Params, http), + Payload = get_value(<<"payload">>, Params, <<>>), + Qos = get_value(<<"qos">>, Params, 0), + Retain = get_value(<<"retain">>, Params, false), case emqttd_mgmt:publish({ClientId, Topic, Payload, Qos, Retain}) of ok -> {ok, []}; @@ -313,9 +333,9 @@ publish('POST', Params) -> end. subscribe('POST', Params) -> - ClientId = proplists:get_value(<<"client_id">>, Params), - Topic = proplists:get_value(<<"topic">>, Params), - Qos = proplists:get_value(<<"qos">>, Params, 0), + ClientId = get_value(<<"client_id">>, Params), + Topic = get_value(<<"topic">>, Params), + Qos = get_value(<<"qos">>, Params, 0), case emqttd_mgmt:subscribe({ClientId, Topic, Qos}) of ok -> {ok, []}; @@ -324,8 +344,8 @@ subscribe('POST', Params) -> end. unsubscribe('POST', Params) -> - ClientId = proplists:get_value(<<"client_id">>, Params), - Topic = proplists:get_value(<<"topic">>, Params), + ClientId = get_value(<<"client_id">>, Params), + Topic = get_value(<<"topic">>, Params), case emqttd_mgmt:unsubscribe({ClientId, Topic})of ok -> {ok, []}; @@ -341,7 +361,7 @@ plugin_list('GET', _Params, Node) -> {ok, Plugins}. enabled('PUT', Params, Node, PluginName) -> - Active = proplists:get_value(<<"active">>, Params), + Active = get_value(<<"active">>, Params), case Active of true -> return(emqttd_mgmt:plugin_load(l2a(Node), l2a(PluginName))); @@ -351,6 +371,8 @@ enabled('PUT', Params, Node, PluginName) -> return(Result) -> case Result of + ok -> + {ok, []}; {ok, _} -> {ok, []}; {error, already_started} -> @@ -372,19 +394,19 @@ plugin(#mqtt_plugin{name = Name, version = Ver, descr = Descr, %% modify config %%-------------------------------------------------------------------------- modify_config('PUT', Params, App) -> - Key = proplists:get_value(<<"key">>, Params, <<"">>), - Value = proplists:get_value(<<"value">>, Params, <<"">>), + Key = get_value(<<"key">>, Params, <<"">>), + Value = get_value(<<"value">>, Params, <<"">>), case emqttd_mgmt:modify_config(l2a(App), b2l(Key), b2l(Value)) of true -> {ok, []}; - false -> {error, [{code, ?ERROR13}]} + false -> {error, [{code, ?ERROR2}]} end. modify_config('PUT', Params, Node, App) -> - Key = proplists:get_value(<<"key">>, Params, <<"">>), - Value = proplists:get_value(<<"value">>, Params, <<"">>), + Key = get_value(<<"key">>, Params, <<"">>), + Value = get_value(<<"value">>, Params, <<"">>), case emqttd_mgmt:modify_config(l2a(Node), l2a(App), b2l(Key), b2l(Value)) of ok -> {ok, []}; - _ -> {error, [{code, ?ERROR13}]} + _ -> {error, [{code, ?ERROR2}]} end. config_list('GET', _Params) -> @@ -395,6 +417,30 @@ config_list('GET', _Params, Node) -> Data = emqttd_mgmt:get_config(l2a(Node)), {ok, [format_config(Config) || Config <- lists:reverse(Data)]}. +plugin_config_list('GET', _Params, Node, App) -> + {ok, Data} = emqttd_mgmt:get_plugin_config(l2a(Node), l2a(App)), + {ok, [format_plugin_config(Config) || Config <- lists:reverse(Data)]}. + +modify_plugin_config('PUT', Params, Node, App) -> + PluginName = l2a(App), + case emqttd_mgmt:modify_plugin_config(l2a(Node), PluginName, Params) of + ok -> + Plugins = emqttd_plugins:list(), + {_, _, _, _, Status} = lists:keyfind(PluginName, 2, Plugins), + case Status of + true -> + emqttd_plugins:unload(PluginName), + timer:sleep(500), + emqttd_plugins:load(PluginName), + {ok, []}; + false -> + {ok, []} + end; + _ -> + {error, [{code, ?ERROR2}]} + end. + + format_config([], Acc) -> Acc; format_config([{Key, Value, Datatpye, App}| Configs], Acc) -> @@ -406,6 +452,53 @@ format_config({Key, Value, Datatpye, App}) -> {<<"datatpye">>, l2b(Datatpye)}, {<<"app">>, App}]. +format_plugin_config({Key, Value, Desc, Required}) -> + [{<<"key">>, l2b(Key)}, + {<<"value">>, l2b(Value)}, + {<<"desc">>, l2b(Desc)}, + {<<"required">>, Required}]. + +%%-------------------------------------------------------------------------- +%% Admin +%%-------------------------------------------------------------------------- +auth('POST', Params) -> + Username = get_value(<<"username">>, Params), + Password = get_value(<<"password">>, Params), + case emqttd_mgmt:check_user(Username, Password) of + ok -> + {ok, []}; + {error, Reason} -> + {error, [{code, ?ERROR3}, {message, list_to_binary(Reason)}]} + end. + +users('POST', Params) -> + Username = get_value(<<"username">>, Params), + Password = get_value(<<"password">>, Params), + Tag = get_value(<<"tags">>, Params), + code(emqttd_mgmt:add_user(Username, Password, Tag)); + +users('GET', _Params) -> + {ok, [Admin || Admin <- emqttd_mgmt:user_list()]}. + +users('GET', _Params, Username) -> + {ok, emqttd_mgmt:lookup_user(list_to_binary(Username))}; + +users('PUT', Params, Username) -> + code(emqttd_mgmt:update_user(list_to_binary(Username), Params)); + +users('DELETE', _Params, "admin") -> + {error, [{code, ?ERROR6}, {message, <<"admin cannot be deleted">>}]}; +users('DELETE', _Params, Username) -> + code(emqttd_mgmt:remove_user(list_to_binary(Username))). + +change_pwd('PUT', Params, Username) -> + OldPwd = get_value(<<"old_pwd">>, Params), + NewPwd = get_value(<<"new_pwd">>, Params), + code(emqttd_mgmt:change_password(list_to_binary(Username), OldPwd, NewPwd)). + +code(ok) -> {ok, []}; +code(error) -> {error, [{code, ?ERROR2}]}; +code({error, Error}) -> {error, Error}. %%-------------------------------------------------------------------------- %% Inner function %%-------------------------------------------------------------------------- @@ -446,6 +539,6 @@ b2l(B) -> binary_to_list(B). page_params(Params) -> - PageNo = int(proplists:get_value("curr_page", Params, "1")), - PageSize = int(proplists:get_value("page_size", Params, "20")), + PageNo = int(get_value("curr_page", Params, "1")), + PageSize = int(get_value("page_size", Params, "20")), {PageNo, PageSize}.