Merge pull request #1212 from emqtt/develop
REST API add modify_config/config_list
This commit is contained in:
commit
86892fd016
2
Makefile
2
Makefile
|
@ -36,7 +36,7 @@ EUNIT_OPTS = verbose
|
||||||
|
|
||||||
CT_SUITES = emqttd emqttd_access emqttd_lib emqttd_inflight emqttd_mod \
|
CT_SUITES = emqttd emqttd_access emqttd_lib emqttd_inflight emqttd_mod \
|
||||||
emqttd_net emqttd_mqueue emqttd_protocol emqttd_topic \
|
emqttd_net emqttd_mqueue emqttd_protocol emqttd_topic \
|
||||||
emqttd_trie emqttd_vm
|
emqttd_trie emqttd_vm emqttd_config
|
||||||
|
|
||||||
CT_OPTS = -cover test/ct.cover.spec -erl_args -name emqttd_ct@127.0.0.1
|
CT_OPTS = -cover test/ct.cover.spec -erl_args -name emqttd_ct@127.0.0.1
|
||||||
|
|
||||||
|
|
|
@ -72,4 +72,5 @@
|
||||||
-define(ERROR10, 110). %% Plugin has been loaded
|
-define(ERROR10, 110). %% Plugin has been loaded
|
||||||
-define(ERROR11, 111). %% Plugin has been loaded
|
-define(ERROR11, 111). %% Plugin has been loaded
|
||||||
-define(ERROR12, 112). %% Client not online
|
-define(ERROR12, 112). %% Client not online
|
||||||
|
-define(ERROR13, 113). %% Modify config fail
|
||||||
|
|
||||||
|
|
|
@ -16,7 +16,13 @@
|
||||||
|
|
||||||
-module (emqttd_cli_config).
|
-module (emqttd_cli_config).
|
||||||
|
|
||||||
-export ([register_config_cli/0, register_config/0, run/1]).
|
-export ([register_config_cli/0,
|
||||||
|
register_config/0,
|
||||||
|
run/1,
|
||||||
|
set_usage/0,
|
||||||
|
all_cfgs/0,
|
||||||
|
get_cfg/2,
|
||||||
|
get_cfg/3]).
|
||||||
|
|
||||||
-define(APP, emqttd).
|
-define(APP, emqttd).
|
||||||
|
|
||||||
|
@ -46,6 +52,54 @@ register_config_cli() ->
|
||||||
register_broker_config(),
|
register_broker_config(),
|
||||||
register_lager_config().
|
register_lager_config().
|
||||||
|
|
||||||
|
set_usage() ->
|
||||||
|
io:format("~-40s# ~-20s# ~-20s ~p~n", ["key", "value", "datatype", "app"]),
|
||||||
|
io:format("------------------------------------------------------------------------------------------------~n"),
|
||||||
|
lists:foreach(fun({Key, Val, Datatype, App}) ->
|
||||||
|
io:format("~-40s# ~-20s# ~-20s ~p~n", [Key, Val, Datatype, App])
|
||||||
|
end, all_cfgs()),
|
||||||
|
io:format("------------------------------------------------------------------------------------------------~n"),
|
||||||
|
io:format("Usage: set key=value --app=appname~n").
|
||||||
|
|
||||||
|
all_cfgs() ->
|
||||||
|
{Mappings, Mappings1} = lists:foldl(
|
||||||
|
fun({Key, {_, Map, _}}, {Acc, Acc1}) ->
|
||||||
|
Map1 = lists:map(fun(M) -> {cuttlefish_mapping:variable(M), Key} end, Map),
|
||||||
|
{Acc ++ Map, Acc1 ++ Map1}
|
||||||
|
end, {[], []}, ets:tab2list(clique_schema)),
|
||||||
|
lists:foldl(fun({Key, _}, Acc) ->
|
||||||
|
case lists:keyfind(cuttlefish_variable:tokenize(Key), 2, Mappings) of
|
||||||
|
false -> Acc;
|
||||||
|
Map ->
|
||||||
|
Datatype = format_datatype(cuttlefish_mapping:datatype(Map)),
|
||||||
|
App = proplists:get_value(cuttlefish_variable:tokenize(Key), Mappings1),
|
||||||
|
[{_, [Val0]}] = clique_config:show([Key], [{app, App}]),
|
||||||
|
Val = any_to_string(proplists:get_value(Key, Val0)),
|
||||||
|
[{Key, Val, Datatype, App} | Acc]
|
||||||
|
end
|
||||||
|
end, [],lists:sort(ets:tab2list(clique_config))).
|
||||||
|
|
||||||
|
get_cfg(App, Key) ->
|
||||||
|
get_cfg(App, Key, undefined).
|
||||||
|
|
||||||
|
get_cfg(App, Key, Def) ->
|
||||||
|
[{_, [Val0]}] = clique_config:show([Key], [{app, App}]),
|
||||||
|
proplists:get_value(Key, Val0, Def).
|
||||||
|
|
||||||
|
format_datatype(Value) ->
|
||||||
|
format_datatype(Value, "").
|
||||||
|
|
||||||
|
format_datatype([Head], Acc) when is_tuple(Head) ->
|
||||||
|
[Head1 | _] = erlang:tuple_to_list(Head),
|
||||||
|
lists:concat([Acc, Head1]);
|
||||||
|
format_datatype([Head], Acc) ->
|
||||||
|
lists:concat([Acc, Head]);
|
||||||
|
format_datatype([Head | Tail], Acc) when is_tuple(Head)->
|
||||||
|
[Head1 | _] = erlang:tuple_to_list(Head),
|
||||||
|
format_datatype(Tail, Acc ++ lists:concat([Head1, ", "]));
|
||||||
|
format_datatype([Head | Tail], Acc) ->
|
||||||
|
format_datatype(Tail, Acc ++ lists:concat([Head, ", "])).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Auth/Acl
|
%% Auth/Acl
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -72,6 +126,8 @@ register_protocol_formatter() ->
|
||||||
"keepalive_backoff"],
|
"keepalive_backoff"],
|
||||||
[clique:register_formatter(["mqtt", Key], fun protocol_formatter_callback/2) || Key <- ConfigKeys].
|
[clique:register_formatter(["mqtt", Key], fun protocol_formatter_callback/2) || Key <- ConfigKeys].
|
||||||
|
|
||||||
|
protocol_formatter_callback([_, "websocket_protocol_header"], Params) ->
|
||||||
|
Params;
|
||||||
protocol_formatter_callback([_, Key], Params) ->
|
protocol_formatter_callback([_, Key], Params) ->
|
||||||
proplists:get_value(l2a(Key), Params).
|
proplists:get_value(l2a(Key), Params).
|
||||||
|
|
||||||
|
@ -85,6 +141,9 @@ register_protocol_config() ->
|
||||||
|
|
||||||
protocol_config_callback([_AppStr, KeyStr], Value) ->
|
protocol_config_callback([_AppStr, KeyStr], Value) ->
|
||||||
protocol_config_callback(protocol, l2a(KeyStr), Value).
|
protocol_config_callback(protocol, l2a(KeyStr), Value).
|
||||||
|
protocol_config_callback(_App, websocket_protocol_header, Value) ->
|
||||||
|
application:set_env(?APP, websocket_protocol_header, Value),
|
||||||
|
" successfully\n";
|
||||||
protocol_config_callback(App, Key, Value) ->
|
protocol_config_callback(App, Key, Value) ->
|
||||||
{ok, Env} = emqttd:env(App),
|
{ok, Env} = emqttd:env(App),
|
||||||
application:set_env(?APP, App, lists:keyreplace(Key, 1, Env, {Key, Value})),
|
application:set_env(?APP, App, lists:keyreplace(Key, 1, Env, {Key, Value})),
|
||||||
|
@ -126,6 +185,15 @@ register_client_config() ->
|
||||||
|
|
||||||
client_config_callback([_, AppStr, KeyStr], Value) ->
|
client_config_callback([_, AppStr, KeyStr], Value) ->
|
||||||
client_config_callback(l2a(AppStr), l2a(KeyStr), Value).
|
client_config_callback(l2a(AppStr), l2a(KeyStr), Value).
|
||||||
|
|
||||||
|
client_config_callback(App, idle_timeout, Value) ->
|
||||||
|
{ok, Env} = emqttd:env(App),
|
||||||
|
application:set_env(?APP, App, lists:keyreplace(client_idle_timeout, 1, Env, {client_idle_timeout, Value})),
|
||||||
|
" successfully\n";
|
||||||
|
client_config_callback(App, enable_stats, Value) ->
|
||||||
|
{ok, Env} = emqttd:env(App),
|
||||||
|
application:set_env(?APP, App, lists:keyreplace(client_enable_stats, 1, Env, {client_enable_stats, Value})),
|
||||||
|
" successfully\n";
|
||||||
client_config_callback(App, Key, Value) ->
|
client_config_callback(App, Key, Value) ->
|
||||||
{ok, Env} = emqttd:env(App),
|
{ok, Env} = emqttd:env(App),
|
||||||
application:set_env(?APP, App, lists:keyreplace(Key, 1, Env, {Key, Value})),
|
application:set_env(?APP, App, lists:keyreplace(Key, 1, Env, {Key, Value})),
|
||||||
|
@ -200,6 +268,23 @@ register_queue_config() ->
|
||||||
|
|
||||||
queue_config_callback([_, AppStr, KeyStr], Value) ->
|
queue_config_callback([_, AppStr, KeyStr], Value) ->
|
||||||
queue_config_callback(l2a(AppStr), l2a(KeyStr), Value).
|
queue_config_callback(l2a(AppStr), l2a(KeyStr), Value).
|
||||||
|
|
||||||
|
queue_config_callback(App, low_watermark, Value) ->
|
||||||
|
{ok, Env} = emqttd:env(App),
|
||||||
|
Parse = fun(S) ->
|
||||||
|
{match, [N]} = re:run(S, "^([0-9]+)%$", [{capture, all_but_first, list}]),
|
||||||
|
list_to_integer(N) / 100
|
||||||
|
end,
|
||||||
|
application:set_env(?APP, App, lists:keyreplace(low_watermark, 1, Env, {low_watermark, Parse(Value)})),
|
||||||
|
" successfully\n";
|
||||||
|
queue_config_callback(App, high_watermark, Value) ->
|
||||||
|
{ok, Env} = emqttd:env(App),
|
||||||
|
Parse = fun(S) ->
|
||||||
|
{match, [N]} = re:run(S, "^([0-9]+)%$", [{capture, all_but_first, list}]),
|
||||||
|
list_to_integer(N) / 100
|
||||||
|
end,
|
||||||
|
application:set_env(?APP, App, lists:keyreplace(high_watermark, 1, Env, {high_watermark, Parse(Value)})),
|
||||||
|
" successfully\n";
|
||||||
queue_config_callback(App, Key, Value) ->
|
queue_config_callback(App, Key, Value) ->
|
||||||
{ok, Env} = emqttd:env(App),
|
{ok, Env} = emqttd:env(App),
|
||||||
application:set_env(?APP, App, lists:keyreplace(Key, 1, Env, {Key, Value})),
|
application:set_env(?APP, App, lists:keyreplace(Key, 1, Env, {Key, Value})),
|
||||||
|
@ -242,3 +327,16 @@ lager_config_callback(_, Value) ->
|
||||||
register_config_whitelist(ConfigKeys) ->
|
register_config_whitelist(ConfigKeys) ->
|
||||||
clique:register_config_whitelist(ConfigKeys, ?APP).
|
clique:register_config_whitelist(ConfigKeys, ?APP).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Inner Function
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
any_to_string(I) when is_integer(I) ->
|
||||||
|
integer_to_list(I);
|
||||||
|
any_to_string(F) when is_float(F)->
|
||||||
|
float_to_list(F,[{decimals, 4}]);
|
||||||
|
any_to_string(A) when is_atom(A) ->
|
||||||
|
atom_to_list(A);
|
||||||
|
any_to_string(B) when is_binary(B) ->
|
||||||
|
binary_to_list(B);
|
||||||
|
any_to_string(L) when is_list(L) ->
|
||||||
|
L.
|
|
@ -34,7 +34,7 @@
|
||||||
-export([start_link/2]).
|
-export([start_link/2]).
|
||||||
|
|
||||||
%% Management and Monitor API
|
%% Management and Monitor API
|
||||||
-export([info/1, stats/1, kick/1]).
|
-export([info/1, stats/1, kick/1, clean_acl_cache/2]).
|
||||||
|
|
||||||
-export([set_rate_limit/2, get_rate_limit/1]).
|
-export([set_rate_limit/2, get_rate_limit/1]).
|
||||||
|
|
||||||
|
@ -92,6 +92,9 @@ unsubscribe(CPid, Topics) ->
|
||||||
session(CPid) ->
|
session(CPid) ->
|
||||||
gen_server2:call(CPid, session, infinity).
|
gen_server2:call(CPid, session, infinity).
|
||||||
|
|
||||||
|
clean_acl_cache(CPid, Topic) ->
|
||||||
|
gen_server2:call(CPid, {clean_acl_cache, Topic}).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% gen_server Callbacks
|
%% gen_server Callbacks
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -175,6 +178,10 @@ handle_call(get_rate_limit, _From, State = #client_state{rate_limit = Rl}) ->
|
||||||
handle_call(session, _From, State = #client_state{proto_state = ProtoState}) ->
|
handle_call(session, _From, State = #client_state{proto_state = ProtoState}) ->
|
||||||
reply(emqttd_protocol:session(ProtoState), State);
|
reply(emqttd_protocol:session(ProtoState), State);
|
||||||
|
|
||||||
|
handle_call({clean_acl_cache, Topic}, _From, State) ->
|
||||||
|
erase({acl, publish, Topic}),
|
||||||
|
reply(ok, State);
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?UNEXPECTED_REQ(Req, State).
|
?UNEXPECTED_REQ(Req, State).
|
||||||
|
|
||||||
|
|
|
@ -52,15 +52,21 @@ dump(_App, _Terms) ->
|
||||||
%% TODO
|
%% TODO
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
-spec(set(atom(), atom(), term()) -> ok).
|
-spec(set(atom(), list(), list()) -> ok).
|
||||||
set(App, Par, Val) ->
|
set(App, Par, Val) ->
|
||||||
application:set_env(App, Par, Val).
|
emqttd_cli_config:run(["config",
|
||||||
|
"set",
|
||||||
|
lists:concat([Par, "=", Val]),
|
||||||
|
lists:concat(["--app=", App])]).
|
||||||
|
|
||||||
-spec(get(atom(), atom()) -> undefined | {ok, term()}).
|
-spec(get(atom(), list()) -> undefined | {ok, term()}).
|
||||||
get(App, Par) ->
|
get(App, Par) ->
|
||||||
application:get_env(App, Par).
|
case emqttd_cli_config:get_cfg(App, Par) of
|
||||||
|
undefined -> undefined;
|
||||||
|
Val -> {ok, Val}
|
||||||
|
end.
|
||||||
|
|
||||||
-spec(get(atom(), atom(), atom()) -> term()).
|
-spec(get(atom(), list(), atom()) -> term()).
|
||||||
get(App, Par, Def) ->
|
get(App, Par, Def) ->
|
||||||
application:get_env(App, Par, Def).
|
emqttd_cli_config:get_cfg(App, Par, Def).
|
||||||
|
|
||||||
|
|
|
@ -68,6 +68,9 @@ run([]) -> usage(), ok;
|
||||||
|
|
||||||
run(["help"]) -> usage(), ok;
|
run(["help"]) -> usage(), ok;
|
||||||
|
|
||||||
|
run(["set"] = CmdS) when length(CmdS) =:= 1 ->
|
||||||
|
emqttd_cli_config:set_usage(), ok;
|
||||||
|
|
||||||
run(["set" | _] = CmdS) ->
|
run(["set" | _] = CmdS) ->
|
||||||
emqttd_cli_config:run(["config" | CmdS]), ok;
|
emqttd_cli_config:run(["config" | CmdS]), ok;
|
||||||
|
|
||||||
|
|
|
@ -41,7 +41,9 @@
|
||||||
|
|
||||||
-export([publish/1, subscribe/1, unsubscribe/1]).
|
-export([publish/1, subscribe/1, unsubscribe/1]).
|
||||||
|
|
||||||
-export([kick_client/1]).
|
-export([kick_client/1, clean_acl_cache/2]).
|
||||||
|
|
||||||
|
-export([modify_config/3, modify_config/4, get_configs/0, get_config/1]).
|
||||||
|
|
||||||
-define(KB, 1024).
|
-define(KB, 1024).
|
||||||
-define(MB, (1024*1024)).
|
-define(MB, (1024*1024)).
|
||||||
|
@ -289,10 +291,7 @@ unsubscribe({ClientId, Topic})->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
kick_client(ClientId) ->
|
kick_client(ClientId) ->
|
||||||
Result = [kick_client(Node, ClientId) || Node <- ekka_mnesia:running_nodes()],
|
Result = [kick_client(Node, ClientId) || Node <- ekka_mnesia:running_nodes()],
|
||||||
case lists:any(fun(Item) -> Item =:= ok end, Result) of
|
lists:any(fun(Item) -> Item =:= ok end, Result).
|
||||||
true -> {ok, [{status, success}]};
|
|
||||||
false -> {ok, [{status, failure}]}
|
|
||||||
end.
|
|
||||||
|
|
||||||
kick_client(Node, ClientId) when Node =:= node() ->
|
kick_client(Node, ClientId) when Node =:= node() ->
|
||||||
case emqttd_cm:lookup(ClientId) of
|
case emqttd_cm:lookup(ClientId) of
|
||||||
|
@ -302,6 +301,39 @@ kick_client(Node, ClientId) when Node =:= node() ->
|
||||||
kick_client(Node, ClientId) ->
|
kick_client(Node, ClientId) ->
|
||||||
rpc_call(Node, kick_client, [Node, ClientId]).
|
rpc_call(Node, kick_client, [Node, ClientId]).
|
||||||
|
|
||||||
|
|
||||||
|
clean_acl_cache(ClientId, Topic) ->
|
||||||
|
Result = [clean_acl_cache(Node, ClientId, Topic) || Node <- ekka_mnesia:running_nodes()],
|
||||||
|
lists:any(fun(Item) -> Item =:= ok end, Result).
|
||||||
|
|
||||||
|
clean_acl_cache(Node, ClientId, Topic) when Node =:= node() ->
|
||||||
|
case emqttd_cm:lookup(ClientId) of
|
||||||
|
undefined -> error;
|
||||||
|
#mqtt_client{client_pid = Pid}-> emqttd_client:clean_acl_cache(Pid, Topic)
|
||||||
|
end;
|
||||||
|
clean_acl_cache(Node, ClientId, Topic) ->
|
||||||
|
rpc_call(Node, clean_acl_cache, [Node, ClientId, Topic]).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Config ENV
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
modify_config(App, Key, Value) ->
|
||||||
|
Result = [modify_config(Node, App, Key, Value) || Node <- ekka_mnesia:running_nodes()],
|
||||||
|
lists:any(fun(Item) -> Item =:= ok end, Result).
|
||||||
|
|
||||||
|
modify_config(Node, App, Key, Value) when Node =:= node() ->
|
||||||
|
emqttd_config:set(App, Key, Value);
|
||||||
|
modify_config(Node, App, Key, Value) ->
|
||||||
|
rpc_call(Node, modify_config, [Node, App, Key, Value]).
|
||||||
|
|
||||||
|
get_configs() ->
|
||||||
|
[{Node, get_config(Node)} || Node <- ekka_mnesia:running_nodes()].
|
||||||
|
|
||||||
|
get_config(Node) when Node =:= node()->
|
||||||
|
emqttd_cli_config:all_cfgs();
|
||||||
|
get_config(Node) ->
|
||||||
|
rpc_call(Node, get_config, [Node]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internel Functions.
|
%% Internel Functions.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -24,7 +24,8 @@
|
||||||
-http_api({"^nodes/(.+?)/clients/?$", 'GET', client_list, []}).
|
-http_api({"^nodes/(.+?)/clients/?$", 'GET', client_list, []}).
|
||||||
-http_api({"^nodes/(.+?)/clients/(.+?)/?$", 'GET',client_list, []}).
|
-http_api({"^nodes/(.+?)/clients/(.+?)/?$", 'GET',client_list, []}).
|
||||||
-http_api({"^clients/(.+?)/?$", 'GET', client, []}).
|
-http_api({"^clients/(.+?)/?$", 'GET', client, []}).
|
||||||
-http_api({"^kick_client/(.+?)/?$", 'PUT', kick_client, []}).
|
-http_api({"^clients/(.+?)/?$", 'DELETE', kick_client, []}).
|
||||||
|
-http_api({"^clean_acl_cache/(.+?)/?$", 'PUT', clean_acl_cache, [{<<"topic">>, binary}]}).
|
||||||
|
|
||||||
-http_api({"^routes?$", 'GET', route_list, []}).
|
-http_api({"^routes?$", 'GET', route_list, []}).
|
||||||
-http_api({"^routes/(.+?)/?$", 'GET', route, []}).
|
-http_api({"^routes/(.+?)/?$", 'GET', route, []}).
|
||||||
|
@ -55,14 +56,19 @@
|
||||||
-http_api({"^nodes/(.+?)/plugins/?$", 'GET', plugin_list, []}).
|
-http_api({"^nodes/(.+?)/plugins/?$", 'GET', plugin_list, []}).
|
||||||
-http_api({"^nodes/(.+?)/plugins/(.+?)/?$", 'PUT', enabled, [{<<"active">>, bool}]}).
|
-http_api({"^nodes/(.+?)/plugins/(.+?)/?$", 'PUT', enabled, [{<<"active">>, bool}]}).
|
||||||
|
|
||||||
|
-http_api({"^configs/(.+?)/?$", 'PUT', modify_config, [{<<"key">>, binary}, {<<"value">>, binary}]}).
|
||||||
|
-http_api({"^configs/?$", 'GET', config_list, []}).
|
||||||
|
-http_api({"^nodes/(.+?)/configs/(.+?)/?$", 'PUT', modify_config, [{<<"key">>, binary}, {<<"value">>, binary}]}).
|
||||||
|
-http_api({"^nodes/(.+?)/configs/?$", 'GET', config_list, []}).
|
||||||
-export([alarm_list/3]).
|
-export([alarm_list/3]).
|
||||||
-export([client/3, client_list/3, client_list/4, kick_client/3]).
|
-export([client/3, client_list/3, client_list/4, kick_client/3, clean_acl_cache/3]).
|
||||||
-export([route/3, route_list/2]).
|
-export([route/3, route_list/2]).
|
||||||
-export([session/3, session_list/3, session_list/4]).
|
-export([session/3, session_list/3, session_list/4]).
|
||||||
-export([subscription/3, subscription_list/3, subscription_list/4]).
|
-export([subscription/3, subscription_list/3, subscription_list/4]).
|
||||||
-export([nodes/2, node/3, brokers/2, broker/3, listeners/2, listener/3, metrics/2, metric/3, stats/2, stat/3]).
|
-export([nodes/2, node/3, brokers/2, broker/3, listeners/2, listener/3, metrics/2, metric/3, stats/2, stat/3]).
|
||||||
-export([publish/2, subscribe/2, unsubscribe/2]).
|
-export([publish/2, subscribe/2, unsubscribe/2]).
|
||||||
-export([plugin_list/3, enabled/4]).
|
-export([plugin_list/3, enabled/4]).
|
||||||
|
-export([modify_config/3, modify_config/4, config_list/2, config_list/3]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------------
|
%%--------------------------------------------------------------------------
|
||||||
%% alarm
|
%% alarm
|
||||||
|
@ -106,10 +112,17 @@ client_list('GET', Params, Node, Key) ->
|
||||||
Data = emqttd_mgmt:client_list(l2a(Node), l2b(Key), PageNo, PageSize),
|
Data = emqttd_mgmt:client_list(l2a(Node), l2b(Key), PageNo, PageSize),
|
||||||
{ok, [{objects, [client_row(Row) || Row <- Data]}]}.
|
{ok, [{objects, [client_row(Row) || Row <- Data]}]}.
|
||||||
|
|
||||||
kick_client('PUT', _Params, Key) ->
|
kick_client('DELETE', _Params, Key) ->
|
||||||
case emqttd_mgmt:kick_client(l2b(Key)) of
|
case emqttd_mgmt:kick_client(l2b(Key)) of
|
||||||
ok -> {ok, []};
|
true -> {ok, []};
|
||||||
error -> {error, [{code, ?ERROR12}]}
|
false -> {error, [{code, ?ERROR12}]}
|
||||||
|
end.
|
||||||
|
|
||||||
|
clean_acl_cache('PUT', Params, Key) ->
|
||||||
|
Topic = proplists:get_value(<<"topic">>, Params),
|
||||||
|
case emqttd_mgmt:clean_acl_cache(l2b(Key), Topic) of
|
||||||
|
true -> {ok, []};
|
||||||
|
false -> {error, [{code, ?ERROR12}]}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
client_row(#mqtt_client{client_id = ClientId,
|
client_row(#mqtt_client{client_id = ClientId,
|
||||||
|
@ -355,6 +368,44 @@ plugin(#mqtt_plugin{name = Name, version = Ver, descr = Descr,
|
||||||
{description, iolist_to_binary(Descr)},
|
{description, iolist_to_binary(Descr)},
|
||||||
{active, Active}].
|
{active, Active}].
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------------
|
||||||
|
%% modify config
|
||||||
|
%%--------------------------------------------------------------------------
|
||||||
|
modify_config('PUT', Params, App) ->
|
||||||
|
Key = proplists:get_value(<<"key">>, Params, <<"">>),
|
||||||
|
Value = proplists:get_value(<<"value">>, Params, <<"">>),
|
||||||
|
case emqttd_mgmt:modify_config(l2a(App), b2l(Key), b2l(Value)) of
|
||||||
|
true -> {ok, []};
|
||||||
|
false -> {error, [{code, ?ERROR13}]}
|
||||||
|
end.
|
||||||
|
|
||||||
|
modify_config('PUT', Params, Node, App) ->
|
||||||
|
Key = proplists:get_value(<<"key">>, Params, <<"">>),
|
||||||
|
Value = proplists:get_value(<<"value">>, Params, <<"">>),
|
||||||
|
case emqttd_mgmt:modify_config(l2a(Node), l2a(App), b2l(Key), b2l(Value)) of
|
||||||
|
ok -> {ok, []};
|
||||||
|
_ -> {error, [{code, ?ERROR13}]}
|
||||||
|
end.
|
||||||
|
|
||||||
|
config_list('GET', _Params) ->
|
||||||
|
Data = emqttd_mgmt:get_configs(),
|
||||||
|
{ok, [{Node, format_config(Config, [])} || {Node, Config} <- Data]}.
|
||||||
|
|
||||||
|
config_list('GET', _Params, Node) ->
|
||||||
|
Data = emqttd_mgmt:get_config(l2a(Node)),
|
||||||
|
{ok, [format_config(Config) || Config <- lists:reverse(Data)]}.
|
||||||
|
|
||||||
|
format_config([], Acc) ->
|
||||||
|
Acc;
|
||||||
|
format_config([{Key, Value, Datatpye, App}| Configs], Acc) ->
|
||||||
|
format_config(Configs, [format_config({Key, Value, Datatpye, App}) | Acc]).
|
||||||
|
|
||||||
|
format_config({Key, Value, Datatpye, App}) ->
|
||||||
|
[{<<"key">>, l2b(Key)},
|
||||||
|
{<<"value">>, l2b(Value)},
|
||||||
|
{<<"datatpye">>, l2b(Datatpye)},
|
||||||
|
{<<"app">>, App}].
|
||||||
|
|
||||||
%%--------------------------------------------------------------------------
|
%%--------------------------------------------------------------------------
|
||||||
%% Inner function
|
%% Inner function
|
||||||
%%--------------------------------------------------------------------------
|
%%--------------------------------------------------------------------------
|
||||||
|
@ -391,6 +442,7 @@ bin(undefined) -> <<>>.
|
||||||
int(L) -> list_to_integer(L).
|
int(L) -> list_to_integer(L).
|
||||||
l2a(L) -> list_to_atom(L).
|
l2a(L) -> list_to_atom(L).
|
||||||
l2b(L) -> list_to_binary(L).
|
l2b(L) -> list_to_binary(L).
|
||||||
|
b2l(B) -> binary_to_list(B).
|
||||||
|
|
||||||
|
|
||||||
page_params(Params) ->
|
page_params(Params) ->
|
||||||
|
|
|
@ -34,7 +34,7 @@
|
||||||
-export([start_link/4]).
|
-export([start_link/4]).
|
||||||
|
|
||||||
%% Management and Monitor API
|
%% Management and Monitor API
|
||||||
-export([info/1, stats/1, kick/1]).
|
-export([info/1, stats/1, kick/1, clean_acl_cache/2]).
|
||||||
|
|
||||||
%% SUB/UNSUB Asynchronously
|
%% SUB/UNSUB Asynchronously
|
||||||
-export([subscribe/2, unsubscribe/2]).
|
-export([subscribe/2, unsubscribe/2]).
|
||||||
|
@ -82,6 +82,9 @@ unsubscribe(CPid, Topics) ->
|
||||||
session(CPid) ->
|
session(CPid) ->
|
||||||
gen_server2:call(CPid, session).
|
gen_server2:call(CPid, session).
|
||||||
|
|
||||||
|
clean_acl_cache(CPid, Topic) ->
|
||||||
|
gen_server2:call(CPid, {clean_acl_cache, Topic}).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% gen_server Callbacks
|
%% gen_server Callbacks
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -133,6 +136,10 @@ handle_call(kick, _From, State) ->
|
||||||
handle_call(session, _From, State = #wsclient_state{proto_state = ProtoState}) ->
|
handle_call(session, _From, State = #wsclient_state{proto_state = ProtoState}) ->
|
||||||
reply(emqttd_protocol:session(ProtoState), State);
|
reply(emqttd_protocol:session(ProtoState), State);
|
||||||
|
|
||||||
|
handle_call({clean_acl_cache, Topic}, _From, State) ->
|
||||||
|
erase({acl, publish, Topic}),
|
||||||
|
reply(ok, State);
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?WSLOG(error, "Unexpected request: ~p", [Req], State),
|
?WSLOG(error, "Unexpected request: ~p", [Req], State),
|
||||||
reply({error, unexpected_request}, State).
|
reply({error, unexpected_request}, State).
|
||||||
|
|
|
@ -20,7 +20,130 @@
|
||||||
|
|
||||||
-include("emqttd.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
all() -> [].
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
groups() -> [].
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
|
||||||
|
all() ->
|
||||||
|
[{group, emq_config}].
|
||||||
|
|
||||||
|
groups() ->
|
||||||
|
[{emq_config, [sequence],
|
||||||
|
[run_protocol_cmd,
|
||||||
|
run_client_cmd,
|
||||||
|
run_session_cmd,
|
||||||
|
run_queue_cmd,
|
||||||
|
run_auth_cmd,
|
||||||
|
run_lager_cmd,
|
||||||
|
run_connection_cmd,
|
||||||
|
run_broker_config]
|
||||||
|
}].
|
||||||
|
|
||||||
|
init_per_suite(Config) ->
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_suite(Config) ->
|
||||||
|
Config.
|
||||||
|
|
||||||
|
run_protocol_cmd(_Config) ->
|
||||||
|
SetConfigKeys = [{"max_clientid_len=2048", int},
|
||||||
|
{"max_packet_size=1024", int},
|
||||||
|
% {"websocket_protocol_header=off", atom},
|
||||||
|
{"keepalive_backoff=0.5", float}],
|
||||||
|
lists:foreach(fun set_cmd/1, SetConfigKeys),
|
||||||
|
R = lists:sort(lists:map(fun env_value/1, SetConfigKeys)),
|
||||||
|
{ok, E} = application:get_env(emqttd, protocol),
|
||||||
|
?assertEqual(R, lists:sort(E)),
|
||||||
|
emqttd_cli_config:run(["config", "set", "mqtt.websocket_protocol_header=off", "--app=emqttd"]),
|
||||||
|
{ok, E1} = application:get_env(emqttd, websocket_protocol_header),
|
||||||
|
?assertEqual(false, E1).
|
||||||
|
|
||||||
|
run_client_cmd(_Config) ->
|
||||||
|
SetConfigKeys = [{"max_publish_rate=100", int},
|
||||||
|
{"idle_timeout=60s", date},
|
||||||
|
{"enable_stats=on", atom}],
|
||||||
|
lists:foreach(fun(Key) -> set_cmd("client", Key) end, SetConfigKeys),
|
||||||
|
R = lists:sort(lists:map(fun(Key) -> env_value("client", Key) end, SetConfigKeys)),
|
||||||
|
{ok, E} = application:get_env(emqttd, client),
|
||||||
|
?assertEqual(R, lists:sort(E)).
|
||||||
|
|
||||||
|
run_session_cmd(_Config) ->
|
||||||
|
SetConfigKeys = [{"max_subscriptions=5", int},
|
||||||
|
{"upgrade_qos=on", atom},
|
||||||
|
{"max_inflight=64", int},
|
||||||
|
{"retry_interval=60s", date},
|
||||||
|
{"max_awaiting_rel=200", int},
|
||||||
|
{"await_rel_timeout=60s",date},
|
||||||
|
{"enable_stats=on", atom},
|
||||||
|
{"expiry_interval=60s", date},
|
||||||
|
{"ignore_loop_deliver=true", atom}],
|
||||||
|
lists:foreach(fun(Key) -> set_cmd("session", Key) end, SetConfigKeys),
|
||||||
|
R = lists:sort(lists:map(fun env_value/1, SetConfigKeys)),
|
||||||
|
{ok, E} = application:get_env(emqttd, session),
|
||||||
|
?assertEqual(R, lists:sort(E)).
|
||||||
|
|
||||||
|
run_queue_cmd(_Config) ->
|
||||||
|
SetConfigKeys = [{"type=priority", atom},
|
||||||
|
{"priority=hah", string},
|
||||||
|
{"max_length=2000", int},
|
||||||
|
{"low_watermark=40%",percent},
|
||||||
|
{"high_watermark=80%", percent},
|
||||||
|
{"store_qos0=false", atom}],
|
||||||
|
lists:foreach(fun(Key) -> set_cmd("mqueue", Key) end, SetConfigKeys),
|
||||||
|
R = lists:sort(lists:map(fun env_value/1, SetConfigKeys)),
|
||||||
|
{ok, E} = application:get_env(emqttd, mqueue),
|
||||||
|
?assertEqual(R, lists:sort(E)).
|
||||||
|
|
||||||
|
run_auth_cmd(_Config) ->
|
||||||
|
SetConfigKeys = [{"allow_anonymous=true", atom},
|
||||||
|
{"acl_nomatch=deny", atom},
|
||||||
|
{"acl_file=etc/test.acl", string},
|
||||||
|
{"cache_acl=false", atom}],
|
||||||
|
lists:foreach(fun set_cmd/1, SetConfigKeys),
|
||||||
|
{ok, true} = application:get_env(emqttd, allow_anonymous),
|
||||||
|
{ok, deny} = application:get_env(emqttd, acl_nomatch),
|
||||||
|
{ok, "etc/test.acl"} = application:get_env(emqttd, acl_file),
|
||||||
|
{ok, false} = application:get_env(emqttd, cache_acl).
|
||||||
|
|
||||||
|
run_lager_cmd(_Config) ->
|
||||||
|
emqttd_cli_config:run(["config", "set", "log.console.level=info", "--app=emqttd"]),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
run_connection_cmd(_Config) ->
|
||||||
|
emqttd_cli_config:run(["config", "set", "mqtt.conn.force_gc_count=1000", "--app=emqttd"]),
|
||||||
|
{ok, E} = application:get_env(emqttd, conn_force_gc_count),
|
||||||
|
?assertEqual(1000, E).
|
||||||
|
|
||||||
|
run_broker_config(_Config) ->
|
||||||
|
emqttd_cli_config:run(["config", "set", "mqtt.broker.sys_interval=10", "--app=emqttd"]),
|
||||||
|
{ok, E} = application:get_env(emqttd, broker_sys_interval),
|
||||||
|
?assertEqual(10, E).
|
||||||
|
|
||||||
|
env_value("client", {Key, Type}) ->
|
||||||
|
case string:split(Key, "=") of
|
||||||
|
["max_publish_rate", V] ->
|
||||||
|
{list_to_atom("max_publish_rate"), format(Type, V)};
|
||||||
|
[K, V] ->
|
||||||
|
{list_to_atom(string:join(["client", K], "_")), format(Type, V)}
|
||||||
|
end.
|
||||||
|
|
||||||
|
env_value({Key, Type}) ->
|
||||||
|
[K, V] = string:split(Key, "="),
|
||||||
|
{list_to_atom(K), format(Type, V)}.
|
||||||
|
|
||||||
|
format(string, S) -> S;
|
||||||
|
format(atom, "on") -> true;
|
||||||
|
format(atom, "off") -> false;
|
||||||
|
format(atom, A) -> list_to_atom(A);
|
||||||
|
format(float, F) -> list_to_float(F);
|
||||||
|
format(percent, P) ->
|
||||||
|
{match, [N]} = re:run(P, "^([0-9]+)%$", [{capture, all_but_first, list}]),
|
||||||
|
list_to_integer(N) / 100;
|
||||||
|
format(int, I) -> list_to_integer(I);
|
||||||
|
format(date, _I) -> 60000.
|
||||||
|
|
||||||
|
set_cmd({Key, _Type}) ->
|
||||||
|
emqttd_cli_config:run(["config", "set", string:join(["mqtt", Key], "."), "--app=emqttd"]).
|
||||||
|
|
||||||
|
set_cmd(Pre, {Key, _Type}) ->
|
||||||
|
emqttd_cli_config:run(["config", "set", string:join(["mqtt", Pre, Key], "."), "--app=emqttd"]).
|
||||||
|
|
Loading…
Reference in New Issue