Add Hot configuration plugin
This commit is contained in:
parent
a20b7005be
commit
d3f52898ee
|
@ -72,5 +72,6 @@
|
|||
-define(ERROR10, 110). %% Plugin has been loaded
|
||||
-define(ERROR11, 111). %% Plugin has been loaded
|
||||
-define(ERROR12, 112). %% Client not online
|
||||
-define(ERROR13, 113). %% Modify config fail
|
||||
-define(ERROR13, 113). %% User already exist
|
||||
-define(ERROR14, 114). %% OldPassword error
|
||||
|
||||
|
|
|
@ -624,13 +624,13 @@ end}.
|
|||
%% @doc Low-water mark of queued messages
|
||||
{mapping, "mqtt.mqueue.low_watermark", "emqttd.mqueue", [
|
||||
{default, "20%"},
|
||||
{datatype, string}
|
||||
{datatype, {percent, float}}
|
||||
]}.
|
||||
|
||||
%% @doc High-water mark of queued messages
|
||||
{mapping, "mqtt.mqueue.high_watermark", "emqttd.mqueue", [
|
||||
{default, "60%"},
|
||||
{datatype, string}
|
||||
{datatype, {percent, float}}
|
||||
]}.
|
||||
|
||||
%% @doc Queue Qos0 messages?
|
||||
|
@ -640,14 +640,10 @@ end}.
|
|||
]}.
|
||||
|
||||
{translation, "emqttd.mqueue", fun(Conf) ->
|
||||
Parse = fun(S) ->
|
||||
{match, [N]} = re:run(S, "^([0-9]+)%$", [{capture, all_but_first, list}]),
|
||||
list_to_integer(N) / 100
|
||||
end,
|
||||
Opts = [{type, cuttlefish:conf_get("mqtt.mqueue.type", Conf, simple)},
|
||||
{max_length, cuttlefish:conf_get("mqtt.mqueue.max_length", Conf)},
|
||||
{low_watermark, Parse(cuttlefish:conf_get("mqtt.mqueue.low_watermark", Conf))},
|
||||
{high_watermark, Parse(cuttlefish:conf_get("mqtt.mqueue.high_watermark", Conf))},
|
||||
{low_watermark, cuttlefish:conf_get("mqtt.mqueue.low_watermark", Conf)},
|
||||
{high_watermark, cuttlefish:conf_get("mqtt.mqueue.high_watermark", Conf)},
|
||||
{store_qos0, cuttlefish:conf_get("mqtt.mqueue.store_qos0", Conf)}],
|
||||
case cuttlefish:conf_get("mqtt.mqueue.priority", Conf) of
|
||||
undefined -> Opts;
|
||||
|
|
|
@ -25,18 +25,19 @@
|
|||
|
||||
-module(emqttd_config).
|
||||
|
||||
-export([read/1, dump/2, reload/1, get/2, get/3, set/3]).
|
||||
-export([read/1, write/2, dump/2, reload/1, get/2, get/3, set/3]).
|
||||
|
||||
-type(env() :: {atom(), term()}).
|
||||
|
||||
%% @doc Read the configuration of an application.
|
||||
-spec(read(atom()) -> {ok, list(env())} | {error, term()}).
|
||||
read(_App) ->
|
||||
read(App) ->
|
||||
%% TODO:
|
||||
%% 1. Read the app.conf from etc folder
|
||||
%% 2. Cuttlefish to read the conf
|
||||
%% 3. Return the terms and schema
|
||||
{error, unsupported}.
|
||||
% {error, unsupported}.
|
||||
{ok, read_(App)}.
|
||||
|
||||
%% @doc Reload configuration of an application.
|
||||
-spec(reload(atom()) -> ok | {error, term()}).
|
||||
|
@ -47,6 +48,20 @@ reload(_App) ->
|
|||
%% 3. set/3 to apply the config
|
||||
ok.
|
||||
|
||||
-spec(write(atom(), list(env())) -> ok | {error, term()}).
|
||||
write(App, Terms) ->
|
||||
Configs = lists:map(fun({Key, Val}) ->
|
||||
{cuttlefish_variable:tokenize(binary_to_list(Key)), binary_to_list(Val)}
|
||||
end, Terms),
|
||||
Path = lists:concat([code:priv_dir(App), "/", App, ".schema"]),
|
||||
Schema = cuttlefish_schema:files([Path]),
|
||||
case cuttlefish_generator:map(Schema, Configs) of
|
||||
[{App, Configs1}] ->
|
||||
lists:foreach(fun({Key, Val}) -> application:set_env(App, Key, Val) end, Configs1);
|
||||
_ ->
|
||||
error
|
||||
end.
|
||||
|
||||
-spec(dump(atom(), list(env())) -> ok | {error, term()}).
|
||||
dump(_App, _Terms) ->
|
||||
%% TODO
|
||||
|
@ -70,3 +85,25 @@ get(App, Par) ->
|
|||
get(App, Par, Def) ->
|
||||
emqttd_cli_config:get_cfg(App, Par, Def).
|
||||
|
||||
|
||||
read_(App) ->
|
||||
{ok, PluginsEtcDir} = emqttd:env(plugins_etc_dir),
|
||||
Configs = cuttlefish_conf:file(lists:concat([PluginsEtcDir, App, ".conf"])),
|
||||
Path= lists:concat([code:priv_dir(App), "/", App, ".schema"]),
|
||||
{_, Mappings, _} = cuttlefish_schema:files([Path]),
|
||||
OptionalCfg = lists:foldl(fun(Map, Acc) ->
|
||||
Key = cuttlefish_mapping:variable(Map),
|
||||
case proplists:get_value(Key, Configs) of
|
||||
undefined ->
|
||||
[{cuttlefish_variable:format(Key), "", cuttlefish_mapping:doc(Map), false} | Acc];
|
||||
_ -> Acc
|
||||
end
|
||||
end, [], Mappings),
|
||||
RequiredCfg = lists:foldl(fun({Key, Val}, Acc) ->
|
||||
case lists:keyfind(Key, 2, Mappings) of
|
||||
false -> Acc;
|
||||
Map ->
|
||||
[{cuttlefish_variable:format(Key), Val, cuttlefish_mapping:doc(Map), true} | Acc]
|
||||
end
|
||||
end, [], Configs),
|
||||
RequiredCfg ++ OptionalCfg.
|
||||
|
|
|
@ -51,8 +51,15 @@ handle_request(Req, State) ->
|
|||
handle_request("/status", Req, Req:get(method));
|
||||
"/" ->
|
||||
handle_request("/", Req, Req:get(method));
|
||||
"/api/v2/auth" ->
|
||||
handle_request(Path, Req, State);
|
||||
_ ->
|
||||
if_authorized(Req, fun() -> handle_request(Path, Req, State) end)
|
||||
Host = Req:get_header_value("Host"),
|
||||
[_, Port] = string:tokens(Host, ":"),
|
||||
case Port of
|
||||
"18083" -> handle_request(Path, Req, State);
|
||||
_ -> if_authorized(Req, fun() -> handle_request(Path, Req, State) end)
|
||||
end
|
||||
end.
|
||||
|
||||
handle_request("/api/v2/" ++ Url, Req, #state{dispatch = Dispatch}) ->
|
||||
|
@ -125,14 +132,8 @@ authorized(Req) ->
|
|||
false;
|
||||
"Basic " ++ BasicAuth ->
|
||||
{Username, Password} = user_passwd(BasicAuth),
|
||||
{ok, Peer} = Req:get(peername),
|
||||
Params = params(Req),
|
||||
ClientId = get_value(<<"client">>, Params, http),
|
||||
case emqttd_access_control:auth(#mqtt_client{client_id = ClientId,
|
||||
username = Username,
|
||||
peername = Peer}, Password) of
|
||||
ok -> true;
|
||||
{ok, _IsSuper} ->
|
||||
case emqttd_mgmt:check_user(Username, Password) of
|
||||
ok ->
|
||||
true;
|
||||
{error, Reason} ->
|
||||
lager:error("HTTP Auth failure: username=~s, reason=~p", [Username, Reason]),
|
||||
|
|
|
@ -26,6 +26,8 @@
|
|||
|
||||
-include_lib("stdlib/include/qlc.hrl").
|
||||
|
||||
-include_lib("emq_dashboard/include/emq_dashboard.hrl").
|
||||
|
||||
-import(proplists, [get_value/2]).
|
||||
|
||||
-export([brokers/0, broker/1, metrics/0, metrics/1, stats/1, stats/0,
|
||||
|
@ -43,14 +45,16 @@
|
|||
|
||||
-export([kick_client/1, clean_acl_cache/2]).
|
||||
|
||||
-export([modify_config/3, modify_config/4, get_configs/0, get_config/1]).
|
||||
-export([modify_config/2, modify_config/3, modify_config/4, get_configs/0, get_config/1,
|
||||
get_plugin_config/1, get_plugin_config/2, modify_plugin_config/2, modify_plugin_config/3]).
|
||||
|
||||
-export([add_user/3, check_user/2, user_list/0, lookup_user/1,
|
||||
update_user/2, change_password/3, remove_user/1]).
|
||||
|
||||
-define(KB, 1024).
|
||||
-define(MB, (1024*1024)).
|
||||
-define(GB, (1024*1024*1024)).
|
||||
|
||||
-define(EMPTY_KEY(Key), ((Key == undefined) orelse (Key == <<>>))).
|
||||
|
||||
brokers() ->
|
||||
[{Node, broker(Node)} || Node <- ekka_mnesia:running_nodes()].
|
||||
|
||||
|
@ -235,7 +239,7 @@ subscribe({ClientId, Topic, Qos}) ->
|
|||
{error, format_error(Topic, "validate topic: ${0} fail")}
|
||||
end.
|
||||
|
||||
unsubscribe({ClientId, Topic})->
|
||||
unsubscribe({ClientId, Topic}) ->
|
||||
case validate(topic, Topic) of
|
||||
true ->
|
||||
case emqttd_sm:lookup_session(ClientId) of
|
||||
|
@ -249,43 +253,6 @@ unsubscribe({ClientId, Topic})->
|
|||
{error, format_error(Topic, "validate topic: ${0} fail")}
|
||||
end.
|
||||
|
||||
% publish(Messages) ->
|
||||
% lists:foldl(
|
||||
% fun({ClientId, Topic, Payload, Qos, Retain}, {Success, Failed}) ->
|
||||
% case validate(topic, Topic) of
|
||||
% true ->
|
||||
% Msg = emqttd_message:make(ClientId, Qos, Topic, Payload),
|
||||
% emqttd:publish(Msg#mqtt_message{retain = Retain}),
|
||||
% {[[{topic, Topic}]| Success], Failed};
|
||||
% false ->
|
||||
% {Success, [[{topic, Topic}]| Failed]}
|
||||
% end
|
||||
% end, {[], []}, Messages).
|
||||
|
||||
% subscribers(Subscribers) ->
|
||||
% lists:foldl(
|
||||
% fun({ClientId, Topic, Qos}, {Success, Failed}) ->
|
||||
% case emqttd_sm:lookup_session(ClientId) of
|
||||
% undefined ->
|
||||
% {Success, [[{client_id, ClientId}]|Failed]};
|
||||
% #mqtt_session{sess_pid = SessPid} ->
|
||||
% emqttd_session:subscribe(SessPid, [{Topic, [{qos, Qos}]}]),
|
||||
% {[[{client_id, ClientId}]| Success], Failed}
|
||||
% end
|
||||
% end,{[], []}, Subscribers).
|
||||
|
||||
% unsubscribers(UnSubscribers)->
|
||||
% lists:foldl(
|
||||
% fun({ClientId, Topic}, {Success, Failed}) ->
|
||||
% case emqttd_sm:lookup_session(ClientId) of
|
||||
% undefined ->
|
||||
% {Success, [[{client_id, ClientId}]|Failed]};
|
||||
% #mqtt_session{sess_pid = SessPid} ->
|
||||
% emqttd_session:unsubscriber(SessPid, [{Topic, []}]),
|
||||
% {[[{client_id, ClientId}]| Success], Failed}
|
||||
% end
|
||||
% end, {[], []}, UnSubscribers).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% manager API
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -317,6 +284,9 @@ clean_acl_cache(Node, ClientId, Topic) ->
|
|||
%%--------------------------------------------------------------------
|
||||
%% Config ENV
|
||||
%%--------------------------------------------------------------------
|
||||
modify_config(App, Terms) ->
|
||||
emqttd_config:write(App, Terms).
|
||||
|
||||
modify_config(App, Key, Value) ->
|
||||
Result = [modify_config(Node, App, Key, Value) || Node <- ekka_mnesia:running_nodes()],
|
||||
lists:any(fun(Item) -> Item =:= ok end, Result).
|
||||
|
@ -334,6 +304,103 @@ get_config(Node) when Node =:= node()->
|
|||
get_config(Node) ->
|
||||
rpc_call(Node, get_config, [Node]).
|
||||
|
||||
get_plugin_config(PluginName) ->
|
||||
emqttd_config:read(PluginName).
|
||||
get_plugin_config(Node, PluginName) ->
|
||||
rpc_call(Node, get_plugin_config, [PluginName]).
|
||||
|
||||
modify_plugin_config(PluginName, Terms) ->
|
||||
emqttd_config:write(PluginName, Terms).
|
||||
modify_plugin_config(Node, PluginName, Terms) ->
|
||||
rpc_call(Node, modify_plugin_config, [PluginName, Terms]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% manager user API
|
||||
%%--------------------------------------------------------------------
|
||||
check_user(undefined, _) ->
|
||||
{error, "Username undefined"};
|
||||
check_user(_, undefined) ->
|
||||
{error, "Password undefined"};
|
||||
check_user(Username, Password) ->
|
||||
case mnesia:dirty_read(mqtt_admin, Username) of
|
||||
[#mqtt_admin{password = <<Salt:4/binary, Hash/binary>>}] ->
|
||||
case Hash =:= md5_hash(Salt, Password) of
|
||||
true -> ok;
|
||||
false -> {error, "Password error"}
|
||||
end;
|
||||
[] ->
|
||||
{error, "User not found"}
|
||||
end.
|
||||
|
||||
add_user(Username, Password, Tag) ->
|
||||
Admin = #mqtt_admin{username = Username,
|
||||
password = hash(Password),
|
||||
tags = Tag},
|
||||
return(mnesia:transaction(fun add_user_/1, [Admin])).
|
||||
|
||||
add_user_(Admin = #mqtt_admin{username = Username}) ->
|
||||
case mnesia:wread({mqtt_admin, Username}) of
|
||||
[] -> mnesia:write(Admin);
|
||||
[_] -> {error, [{code, ?ERROR13}, {message, <<"User already exist">>}]}
|
||||
end.
|
||||
|
||||
user_list() ->
|
||||
[row(Admin) || Admin <- ets:tab2list(mqtt_admin)].
|
||||
|
||||
lookup_user(Username) ->
|
||||
Admin = mnesia:dirty_read(mqtt_admin, Username),
|
||||
row(Admin).
|
||||
|
||||
update_user(Username, Params) ->
|
||||
case mnesia:dirty_read({mqtt_admin, Username}) of
|
||||
[] ->
|
||||
{error, [{code, ?ERROR5}, {message, <<"User not found">>}]};
|
||||
[User] ->
|
||||
Admin = case proplists:get_value(<<"tags">>, Params) of
|
||||
undefined -> User;
|
||||
Tag -> User#mqtt_admin{tags = Tag}
|
||||
end,
|
||||
return(mnesia:transaction(fun() -> mnesia:write(Admin) end))
|
||||
end.
|
||||
|
||||
remove_user(Username) ->
|
||||
Trans = fun() ->
|
||||
case lookup_user(Username) of
|
||||
[] -> {error, [{code, ?ERROR5}, {message, <<"User not found">>}]};
|
||||
_ -> mnesia:delete({mqtt_admin, Username})
|
||||
end
|
||||
end,
|
||||
return(mnesia:transaction(Trans)).
|
||||
|
||||
change_password(Username, OldPwd, NewPwd) ->
|
||||
Trans = fun() ->
|
||||
case mnesia:wread({mqtt_admin, Username}) of
|
||||
[Admin = #mqtt_admin{password = <<Salt:4/binary, Hash/binary>>}] ->
|
||||
case Hash =:= md5_hash(Salt, OldPwd) of
|
||||
true ->
|
||||
mnesia:write(Admin#mqtt_admin{password = hash(NewPwd)});
|
||||
false ->
|
||||
{error, [{code, ?ERROR14}, {message, <<"OldPassword error">>}]}
|
||||
end;
|
||||
[] ->
|
||||
{error, [{code, ?ERROR5}, {message, <<"User not found">>}]}
|
||||
end
|
||||
end,
|
||||
return(mnesia:transaction(Trans)).
|
||||
|
||||
return({atomic, ok}) ->
|
||||
ok;
|
||||
return({atomic, Error}) ->
|
||||
Error;
|
||||
return({aborted, Reason}) ->
|
||||
lager:error("Mnesia Transaction error:~p~n", [Reason]),
|
||||
error.
|
||||
|
||||
row(#mqtt_admin{username = Username, tags = Tags}) ->
|
||||
[{username, Username}, {tags, Tags}];
|
||||
row([#mqtt_admin{username = Username, tags = Tags}]) ->
|
||||
[{username, Username}, {tags, Tags}];
|
||||
row([]) ->[].
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internel Functions.
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -413,3 +480,17 @@ tables() ->
|
|||
format_error(Val, Msg) ->
|
||||
re:replace(Msg, <<"\\$\\{[^}]+\\}">>, Val, [global, {return, binary}]).
|
||||
|
||||
hash(Password) ->
|
||||
SaltBin = salt(),
|
||||
<<SaltBin/binary, (md5_hash(SaltBin, Password))/binary>>.
|
||||
|
||||
md5_hash(SaltBin, Password) ->
|
||||
erlang:md5(<<SaltBin/binary, Password/binary>>).
|
||||
|
||||
salt() ->
|
||||
seed(),
|
||||
Salt = rand:uniform(16#ffffffff),
|
||||
<<Salt:32>>.
|
||||
|
||||
seed() ->
|
||||
rand:seed(exsplus, erlang:timestamp()).
|
||||
|
|
|
@ -60,6 +60,23 @@
|
|||
-http_api({"^configs/?$", 'GET', config_list, []}).
|
||||
-http_api({"^nodes/(.+?)/configs/(.+?)/?$", 'PUT', modify_config, [{<<"key">>, binary}, {<<"value">>, binary}]}).
|
||||
-http_api({"^nodes/(.+?)/configs/?$", 'GET', config_list, []}).
|
||||
-http_api({"^nodes/(.+?)/plugin_configs/(.+?)/?$", 'GET', plugin_config_list, []}).
|
||||
-http_api({"^nodes/(.+?)/plugin_configs/(.+?)/?$", 'PUT', modify_plugin_config, []}).
|
||||
|
||||
-http_api({"^users/?$", 'GET', users, []}).
|
||||
-http_api({"^users/?$", 'POST', users, [{<<"username">>, binary},
|
||||
{<<"password">>, binary},
|
||||
{<<"tag">>, binary}]}).
|
||||
-http_api({"^users/(.+?)/?$", 'GET', users, []}).
|
||||
-http_api({"^users/(.+?)/?$", 'PUT', users, []}).
|
||||
-http_api({"^users/(.+?)/?$", 'DELETE', users, []}).
|
||||
|
||||
-http_api({"^auth/?$", 'POST', auth, [{<<"username">>, binary}, {<<"password">>, binary}]}).
|
||||
-http_api({"^change_pwd/(.+?)/?$", 'PUT', change_pwd, [{<<"old_pwd">>, binary},
|
||||
{<<"new_pwd">>, binary}]}).
|
||||
|
||||
-import(proplists, [get_value/2, get_value/3]).
|
||||
|
||||
-export([alarm_list/3]).
|
||||
-export([client/3, client_list/3, client_list/4, kick_client/3, clean_acl_cache/3]).
|
||||
-export([route/3, route_list/2]).
|
||||
|
@ -68,7 +85,10 @@
|
|||
-export([nodes/2, node/3, brokers/2, broker/3, listeners/2, listener/3, metrics/2, metric/3, stats/2, stat/3]).
|
||||
-export([publish/2, subscribe/2, unsubscribe/2]).
|
||||
-export([plugin_list/3, enabled/4]).
|
||||
-export([modify_config/3, modify_config/4, config_list/2, config_list/3]).
|
||||
-export([modify_config/3, modify_config/4, config_list/2, config_list/3,
|
||||
plugin_config_list/4, modify_plugin_config/4]).
|
||||
|
||||
-export([users/2,users/3, auth/2, change_pwd/3]).
|
||||
|
||||
%%--------------------------------------------------------------------------
|
||||
%% alarm
|
||||
|
@ -98,9 +118,9 @@ client('GET', _Params, Key) ->
|
|||
client_list('GET', Params, Node) ->
|
||||
{PageNo, PageSize} = page_params(Params),
|
||||
Data = emqttd_mgmt:client_list(l2a(Node), undefined, PageNo, PageSize),
|
||||
Rows = proplists:get_value(result, Data),
|
||||
TotalPage = proplists:get_value(totalPage, Data),
|
||||
TotalNum = proplists:get_value(totalNum, Data),
|
||||
Rows = get_value(result, Data),
|
||||
TotalPage = get_value(totalPage, Data),
|
||||
TotalNum = get_value(totalNum, Data),
|
||||
{ok, [{current_page, PageNo},
|
||||
{page_size, PageSize},
|
||||
{total_num, TotalNum},
|
||||
|
@ -119,7 +139,7 @@ kick_client('DELETE', _Params, Key) ->
|
|||
end.
|
||||
|
||||
clean_acl_cache('PUT', Params, Key) ->
|
||||
Topic = proplists:get_value(<<"topic">>, Params),
|
||||
Topic = get_value(<<"topic">>, Params),
|
||||
case emqttd_mgmt:clean_acl_cache(l2b(Key), Topic) of
|
||||
true -> {ok, []};
|
||||
false -> {error, [{code, ?ERROR12}]}
|
||||
|
@ -151,9 +171,9 @@ route('GET', _Params, Key) ->
|
|||
route_list('GET', Params) ->
|
||||
{PageNo, PageSize} = page_params(Params),
|
||||
Data = emqttd_mgmt:route_list(undefined, PageNo, PageSize),
|
||||
Rows = proplists:get_value(result, Data),
|
||||
TotalPage = proplists:get_value(totalPage, Data),
|
||||
TotalNum = proplists:get_value(totalNum, Data),
|
||||
Rows = get_value(result, Data),
|
||||
TotalPage = get_value(totalPage, Data),
|
||||
TotalNum = get_value(totalNum, Data),
|
||||
{ok, [{current_page, PageNo},
|
||||
{page_size, PageSize},
|
||||
{total_num, TotalNum},
|
||||
|
@ -176,9 +196,9 @@ session('GET', _Params, Key) ->
|
|||
session_list('GET', Params, Node) ->
|
||||
{PageNo, PageSize} = page_params(Params),
|
||||
Data = emqttd_mgmt:session_list(l2a(Node), undefined, PageNo, PageSize),
|
||||
Rows = proplists:get_value(result, Data),
|
||||
TotalPage = proplists:get_value(totalPage, Data),
|
||||
TotalNum = proplists:get_value(totalNum, Data),
|
||||
Rows = get_value(result, Data),
|
||||
TotalPage = get_value(totalPage, Data),
|
||||
TotalNum = get_value(totalNum, Data),
|
||||
{ok, [{current_page, PageNo},
|
||||
{page_size, PageSize},
|
||||
{total_num, TotalNum},
|
||||
|
@ -193,7 +213,7 @@ session_list('GET', Params, Node, ClientId) ->
|
|||
session_row({ClientId, _Pid, _Persistent, Session}) ->
|
||||
InfoKeys = [clean_sess, max_inflight, inflight_queue, message_queue,
|
||||
message_dropped, awaiting_rel, awaiting_ack, awaiting_comp, created_at],
|
||||
[{client_id, ClientId} | [{Key, format(Key, proplists:get_value(Key, Session))} || Key <- InfoKeys]].
|
||||
[{client_id, ClientId} | [{Key, format(Key, get_value(Key, Session))} || Key <- InfoKeys]].
|
||||
|
||||
%%--------------------------------------------------------------------------
|
||||
%% subscription
|
||||
|
@ -205,9 +225,9 @@ subscription('GET', _Params, Key) ->
|
|||
subscription_list('GET', Params, Node) ->
|
||||
{PageNo, PageSize} = page_params(Params),
|
||||
Data = emqttd_mgmt:subscription_list(l2a(Node), undefined, PageNo, PageSize),
|
||||
Rows = proplists:get_value(result, Data),
|
||||
TotalPage = proplists:get_value(totalPage, Data),
|
||||
TotalNum = proplists:get_value(totalNum, Data),
|
||||
Rows = get_value(result, Data),
|
||||
TotalPage = get_value(totalPage, Data),
|
||||
TotalNum = get_value(totalNum, Data),
|
||||
{ok, [{current_page, PageNo},
|
||||
{page_size, PageSize},
|
||||
{total_num, TotalNum},
|
||||
|
@ -222,7 +242,7 @@ subscription_list('GET', Params, Node, Key) ->
|
|||
subscription_row({{Topic, ClientId}, Option}) when is_pid(ClientId) ->
|
||||
subscription_row({{Topic, l2b(pid_to_list(ClientId))}, Option});
|
||||
subscription_row({{Topic, ClientId}, Option}) ->
|
||||
Qos = proplists:get_value(qos, Option),
|
||||
Qos = get_value(qos, Option),
|
||||
[{client_id, ClientId}, {topic, Topic}, {qos, Qos}].
|
||||
|
||||
%%--------------------------------------------------------------------------
|
||||
|
@ -246,7 +266,7 @@ broker('GET', _Params, Node) ->
|
|||
|
||||
listeners('GET', _Params) ->
|
||||
Data = emqttd_mgmt:listeners(),
|
||||
{ok, [{Node, format_listeners(Listeners, [])} || {Node, Listeners} <- Data]}.
|
||||
{ok, [[{Node, format_listeners(Listeners, [])} || {Node, Listeners} <- Data]]}.
|
||||
|
||||
listener('GET', _Params, Node) ->
|
||||
Data = emqttd_mgmt:listener(l2a(Node)),
|
||||
|
@ -254,7 +274,7 @@ listener('GET', _Params, Node) ->
|
|||
|
||||
metrics('GET', _Params) ->
|
||||
Data = emqttd_mgmt:metrics(),
|
||||
{ok, Data}.
|
||||
{ok, [Data]}.
|
||||
|
||||
metric('GET', _Params, Node) ->
|
||||
Data = emqttd_mgmt:metrics(l2a(Node)),
|
||||
|
@ -262,7 +282,7 @@ metric('GET', _Params, Node) ->
|
|||
|
||||
stats('GET', _Params) ->
|
||||
Data = emqttd_mgmt:stats(),
|
||||
{ok, Data}.
|
||||
{ok, [Data]}.
|
||||
|
||||
stat('GET', _Params, Node) ->
|
||||
Data = emqttd_mgmt:stats(l2a(Node)),
|
||||
|
@ -271,19 +291,19 @@ stat('GET', _Params, Node) ->
|
|||
format_broker(Node, Broker) ->
|
||||
OtpRel = "R" ++ erlang:system_info(otp_release) ++ "/" ++ erlang:system_info(version),
|
||||
[{name, Node},
|
||||
{version, bin(proplists:get_value(version, Broker))},
|
||||
{sysdescr, bin(proplists:get_value(sysdescr, Broker))},
|
||||
{uptime, bin(proplists:get_value(uptime, Broker))},
|
||||
{datetime, bin(proplists:get_value(datetime, Broker))},
|
||||
{version, bin(get_value(version, Broker))},
|
||||
{sysdescr, bin(get_value(sysdescr, Broker))},
|
||||
{uptime, bin(get_value(uptime, Broker))},
|
||||
{datetime, bin(get_value(datetime, Broker))},
|
||||
{otp_release, l2b(OtpRel)},
|
||||
{node_status, 'Running'}].
|
||||
|
||||
format_broker(Broker) ->
|
||||
OtpRel = "R" ++ erlang:system_info(otp_release) ++ "/" ++ erlang:system_info(version),
|
||||
[{version, bin(proplists:get_value(version, Broker))},
|
||||
{sysdescr, bin(proplists:get_value(sysdescr, Broker))},
|
||||
{uptime, bin(proplists:get_value(uptime, Broker))},
|
||||
{datetime, bin(proplists:get_value(datetime, Broker))},
|
||||
[{version, bin(get_value(version, Broker))},
|
||||
{sysdescr, bin(get_value(sysdescr, Broker))},
|
||||
{uptime, bin(get_value(uptime, Broker))},
|
||||
{datetime, bin(get_value(datetime, Broker))},
|
||||
{otp_release, l2b(OtpRel)},
|
||||
{node_status, 'Running'}].
|
||||
|
||||
|
@ -300,11 +320,11 @@ format_listener({Protocol, ListenOn, Info}) ->
|
|||
%% mqtt
|
||||
%%--------------------------------------------------------------------------
|
||||
publish('POST', Params) ->
|
||||
Topic = proplists:get_value(<<"topic">>, Params),
|
||||
ClientId = proplists:get_value(<<"client_id">>, Params, http),
|
||||
Payload = proplists:get_value(<<"payload">>, Params, <<>>),
|
||||
Qos = proplists:get_value(<<"qos">>, Params, 0),
|
||||
Retain = proplists:get_value(<<"retain">>, Params, false),
|
||||
Topic = get_value(<<"topic">>, Params),
|
||||
ClientId = get_value(<<"client_id">>, Params, http),
|
||||
Payload = get_value(<<"payload">>, Params, <<>>),
|
||||
Qos = get_value(<<"qos">>, Params, 0),
|
||||
Retain = get_value(<<"retain">>, Params, false),
|
||||
case emqttd_mgmt:publish({ClientId, Topic, Payload, Qos, Retain}) of
|
||||
ok ->
|
||||
{ok, []};
|
||||
|
@ -313,9 +333,9 @@ publish('POST', Params) ->
|
|||
end.
|
||||
|
||||
subscribe('POST', Params) ->
|
||||
ClientId = proplists:get_value(<<"client_id">>, Params),
|
||||
Topic = proplists:get_value(<<"topic">>, Params),
|
||||
Qos = proplists:get_value(<<"qos">>, Params, 0),
|
||||
ClientId = get_value(<<"client_id">>, Params),
|
||||
Topic = get_value(<<"topic">>, Params),
|
||||
Qos = get_value(<<"qos">>, Params, 0),
|
||||
case emqttd_mgmt:subscribe({ClientId, Topic, Qos}) of
|
||||
ok ->
|
||||
{ok, []};
|
||||
|
@ -324,8 +344,8 @@ subscribe('POST', Params) ->
|
|||
end.
|
||||
|
||||
unsubscribe('POST', Params) ->
|
||||
ClientId = proplists:get_value(<<"client_id">>, Params),
|
||||
Topic = proplists:get_value(<<"topic">>, Params),
|
||||
ClientId = get_value(<<"client_id">>, Params),
|
||||
Topic = get_value(<<"topic">>, Params),
|
||||
case emqttd_mgmt:unsubscribe({ClientId, Topic})of
|
||||
ok ->
|
||||
{ok, []};
|
||||
|
@ -341,7 +361,7 @@ plugin_list('GET', _Params, Node) ->
|
|||
{ok, Plugins}.
|
||||
|
||||
enabled('PUT', Params, Node, PluginName) ->
|
||||
Active = proplists:get_value(<<"active">>, Params),
|
||||
Active = get_value(<<"active">>, Params),
|
||||
case Active of
|
||||
true ->
|
||||
return(emqttd_mgmt:plugin_load(l2a(Node), l2a(PluginName)));
|
||||
|
@ -351,6 +371,8 @@ enabled('PUT', Params, Node, PluginName) ->
|
|||
|
||||
return(Result) ->
|
||||
case Result of
|
||||
ok ->
|
||||
{ok, []};
|
||||
{ok, _} ->
|
||||
{ok, []};
|
||||
{error, already_started} ->
|
||||
|
@ -372,19 +394,19 @@ plugin(#mqtt_plugin{name = Name, version = Ver, descr = Descr,
|
|||
%% modify config
|
||||
%%--------------------------------------------------------------------------
|
||||
modify_config('PUT', Params, App) ->
|
||||
Key = proplists:get_value(<<"key">>, Params, <<"">>),
|
||||
Value = proplists:get_value(<<"value">>, Params, <<"">>),
|
||||
Key = get_value(<<"key">>, Params, <<"">>),
|
||||
Value = get_value(<<"value">>, Params, <<"">>),
|
||||
case emqttd_mgmt:modify_config(l2a(App), b2l(Key), b2l(Value)) of
|
||||
true -> {ok, []};
|
||||
false -> {error, [{code, ?ERROR13}]}
|
||||
false -> {error, [{code, ?ERROR2}]}
|
||||
end.
|
||||
|
||||
modify_config('PUT', Params, Node, App) ->
|
||||
Key = proplists:get_value(<<"key">>, Params, <<"">>),
|
||||
Value = proplists:get_value(<<"value">>, Params, <<"">>),
|
||||
Key = get_value(<<"key">>, Params, <<"">>),
|
||||
Value = get_value(<<"value">>, Params, <<"">>),
|
||||
case emqttd_mgmt:modify_config(l2a(Node), l2a(App), b2l(Key), b2l(Value)) of
|
||||
ok -> {ok, []};
|
||||
_ -> {error, [{code, ?ERROR13}]}
|
||||
_ -> {error, [{code, ?ERROR2}]}
|
||||
end.
|
||||
|
||||
config_list('GET', _Params) ->
|
||||
|
@ -395,6 +417,30 @@ config_list('GET', _Params, Node) ->
|
|||
Data = emqttd_mgmt:get_config(l2a(Node)),
|
||||
{ok, [format_config(Config) || Config <- lists:reverse(Data)]}.
|
||||
|
||||
plugin_config_list('GET', _Params, Node, App) ->
|
||||
{ok, Data} = emqttd_mgmt:get_plugin_config(l2a(Node), l2a(App)),
|
||||
{ok, [format_plugin_config(Config) || Config <- lists:reverse(Data)]}.
|
||||
|
||||
modify_plugin_config('PUT', Params, Node, App) ->
|
||||
PluginName = l2a(App),
|
||||
case emqttd_mgmt:modify_plugin_config(l2a(Node), PluginName, Params) of
|
||||
ok ->
|
||||
Plugins = emqttd_plugins:list(),
|
||||
{_, _, _, _, Status} = lists:keyfind(PluginName, 2, Plugins),
|
||||
case Status of
|
||||
true ->
|
||||
emqttd_plugins:unload(PluginName),
|
||||
timer:sleep(500),
|
||||
emqttd_plugins:load(PluginName),
|
||||
{ok, []};
|
||||
false ->
|
||||
{ok, []}
|
||||
end;
|
||||
_ ->
|
||||
{error, [{code, ?ERROR2}]}
|
||||
end.
|
||||
|
||||
|
||||
format_config([], Acc) ->
|
||||
Acc;
|
||||
format_config([{Key, Value, Datatpye, App}| Configs], Acc) ->
|
||||
|
@ -406,6 +452,53 @@ format_config({Key, Value, Datatpye, App}) ->
|
|||
{<<"datatpye">>, l2b(Datatpye)},
|
||||
{<<"app">>, App}].
|
||||
|
||||
format_plugin_config({Key, Value, Desc, Required}) ->
|
||||
[{<<"key">>, l2b(Key)},
|
||||
{<<"value">>, l2b(Value)},
|
||||
{<<"desc">>, l2b(Desc)},
|
||||
{<<"required">>, Required}].
|
||||
|
||||
%%--------------------------------------------------------------------------
|
||||
%% Admin
|
||||
%%--------------------------------------------------------------------------
|
||||
auth('POST', Params) ->
|
||||
Username = get_value(<<"username">>, Params),
|
||||
Password = get_value(<<"password">>, Params),
|
||||
case emqttd_mgmt:check_user(Username, Password) of
|
||||
ok ->
|
||||
{ok, []};
|
||||
{error, Reason} ->
|
||||
{error, [{code, ?ERROR3}, {message, list_to_binary(Reason)}]}
|
||||
end.
|
||||
|
||||
users('POST', Params) ->
|
||||
Username = get_value(<<"username">>, Params),
|
||||
Password = get_value(<<"password">>, Params),
|
||||
Tag = get_value(<<"tags">>, Params),
|
||||
code(emqttd_mgmt:add_user(Username, Password, Tag));
|
||||
|
||||
users('GET', _Params) ->
|
||||
{ok, [Admin || Admin <- emqttd_mgmt:user_list()]}.
|
||||
|
||||
users('GET', _Params, Username) ->
|
||||
{ok, emqttd_mgmt:lookup_user(list_to_binary(Username))};
|
||||
|
||||
users('PUT', Params, Username) ->
|
||||
code(emqttd_mgmt:update_user(list_to_binary(Username), Params));
|
||||
|
||||
users('DELETE', _Params, "admin") ->
|
||||
{error, [{code, ?ERROR6}, {message, <<"admin cannot be deleted">>}]};
|
||||
users('DELETE', _Params, Username) ->
|
||||
code(emqttd_mgmt:remove_user(list_to_binary(Username))).
|
||||
|
||||
change_pwd('PUT', Params, Username) ->
|
||||
OldPwd = get_value(<<"old_pwd">>, Params),
|
||||
NewPwd = get_value(<<"new_pwd">>, Params),
|
||||
code(emqttd_mgmt:change_password(list_to_binary(Username), OldPwd, NewPwd)).
|
||||
|
||||
code(ok) -> {ok, []};
|
||||
code(error) -> {error, [{code, ?ERROR2}]};
|
||||
code({error, Error}) -> {error, Error}.
|
||||
%%--------------------------------------------------------------------------
|
||||
%% Inner function
|
||||
%%--------------------------------------------------------------------------
|
||||
|
@ -446,6 +539,6 @@ b2l(B) -> binary_to_list(B).
|
|||
|
||||
|
||||
page_params(Params) ->
|
||||
PageNo = int(proplists:get_value("curr_page", Params, "1")),
|
||||
PageSize = int(proplists:get_value("page_size", Params, "20")),
|
||||
PageNo = int(get_value("curr_page", Params, "1")),
|
||||
PageSize = int(get_value("page_size", Params, "20")),
|
||||
{PageNo, PageSize}.
|
||||
|
|
Loading…
Reference in New Issue