diff --git a/Makefile b/Makefile index d313936ac..8203004c8 100644 --- a/Makefile +++ b/Makefile @@ -36,7 +36,7 @@ EUNIT_OPTS = verbose CT_SUITES = emqttd emqttd_access emqttd_lib emqttd_inflight emqttd_mod \ emqttd_net emqttd_mqueue emqttd_protocol emqttd_topic \ - emqttd_trie emqttd_vm + emqttd_trie emqttd_vm emqttd_config CT_OPTS = -cover test/ct.cover.spec -erl_args -name emqttd_ct@127.0.0.1 diff --git a/include/emqttd_internal.hrl b/include/emqttd_internal.hrl index 68e2b6ba6..06398e31f 100644 --- a/include/emqttd_internal.hrl +++ b/include/emqttd_internal.hrl @@ -72,4 +72,5 @@ -define(ERROR10, 110). %% Plugin has been loaded -define(ERROR11, 111). %% Plugin has been loaded -define(ERROR12, 112). %% Client not online +-define(ERROR13, 113). %% Modify config fail diff --git a/src/emqttd_cli_config.erl b/src/emqttd_cli_config.erl index 0e0e29c4b..aca2a45bb 100644 --- a/src/emqttd_cli_config.erl +++ b/src/emqttd_cli_config.erl @@ -16,7 +16,13 @@ -module (emqttd_cli_config). --export ([register_config_cli/0, register_config/0, run/1]). +-export ([register_config_cli/0, + register_config/0, + run/1, + set_usage/0, + all_cfgs/0, + get_cfg/2, + get_cfg/3]). -define(APP, emqttd). @@ -46,6 +52,54 @@ register_config_cli() -> register_broker_config(), register_lager_config(). +set_usage() -> + io:format("~-40s# ~-20s# ~-20s ~p~n", ["key", "value", "datatype", "app"]), + io:format("------------------------------------------------------------------------------------------------~n"), + lists:foreach(fun({Key, Val, Datatype, App}) -> + io:format("~-40s# ~-20s# ~-20s ~p~n", [Key, Val, Datatype, App]) + end, all_cfgs()), + io:format("------------------------------------------------------------------------------------------------~n"), + io:format("Usage: set key=value --app=appname~n"). + +all_cfgs() -> + {Mappings, Mappings1} = lists:foldl( + fun({Key, {_, Map, _}}, {Acc, Acc1}) -> + Map1 = lists:map(fun(M) -> {cuttlefish_mapping:variable(M), Key} end, Map), + {Acc ++ Map, Acc1 ++ Map1} + end, {[], []}, ets:tab2list(clique_schema)), + lists:foldl(fun({Key, _}, Acc) -> + case lists:keyfind(cuttlefish_variable:tokenize(Key), 2, Mappings) of + false -> Acc; + Map -> + Datatype = format_datatype(cuttlefish_mapping:datatype(Map)), + App = proplists:get_value(cuttlefish_variable:tokenize(Key), Mappings1), + [{_, [Val0]}] = clique_config:show([Key], [{app, App}]), + Val = any_to_string(proplists:get_value(Key, Val0)), + [{Key, Val, Datatype, App} | Acc] + end + end, [],lists:sort(ets:tab2list(clique_config))). + +get_cfg(App, Key) -> + get_cfg(App, Key, undefined). + +get_cfg(App, Key, Def) -> + [{_, [Val0]}] = clique_config:show([Key], [{app, App}]), + proplists:get_value(Key, Val0, Def). + +format_datatype(Value) -> + format_datatype(Value, ""). + +format_datatype([Head], Acc) when is_tuple(Head) -> + [Head1 | _] = erlang:tuple_to_list(Head), + lists:concat([Acc, Head1]); +format_datatype([Head], Acc) -> + lists:concat([Acc, Head]); +format_datatype([Head | Tail], Acc) when is_tuple(Head)-> + [Head1 | _] = erlang:tuple_to_list(Head), + format_datatype(Tail, Acc ++ lists:concat([Head1, ", "])); +format_datatype([Head | Tail], Acc) -> + format_datatype(Tail, Acc ++ lists:concat([Head, ", "])). + %%-------------------------------------------------------------------- %% Auth/Acl %%-------------------------------------------------------------------- @@ -72,6 +126,8 @@ register_protocol_formatter() -> "keepalive_backoff"], [clique:register_formatter(["mqtt", Key], fun protocol_formatter_callback/2) || Key <- ConfigKeys]. +protocol_formatter_callback([_, "websocket_protocol_header"], Params) -> + Params; protocol_formatter_callback([_, Key], Params) -> proplists:get_value(l2a(Key), Params). @@ -85,6 +141,9 @@ register_protocol_config() -> protocol_config_callback([_AppStr, KeyStr], Value) -> protocol_config_callback(protocol, l2a(KeyStr), Value). +protocol_config_callback(_App, websocket_protocol_header, Value) -> + application:set_env(?APP, websocket_protocol_header, Value), + " successfully\n"; protocol_config_callback(App, Key, Value) -> {ok, Env} = emqttd:env(App), application:set_env(?APP, App, lists:keyreplace(Key, 1, Env, {Key, Value})), @@ -126,6 +185,15 @@ register_client_config() -> client_config_callback([_, AppStr, KeyStr], Value) -> client_config_callback(l2a(AppStr), l2a(KeyStr), Value). + +client_config_callback(App, idle_timeout, Value) -> + {ok, Env} = emqttd:env(App), + application:set_env(?APP, App, lists:keyreplace(client_idle_timeout, 1, Env, {client_idle_timeout, Value})), + " successfully\n"; +client_config_callback(App, enable_stats, Value) -> + {ok, Env} = emqttd:env(App), + application:set_env(?APP, App, lists:keyreplace(client_enable_stats, 1, Env, {client_enable_stats, Value})), + " successfully\n"; client_config_callback(App, Key, Value) -> {ok, Env} = emqttd:env(App), application:set_env(?APP, App, lists:keyreplace(Key, 1, Env, {Key, Value})), @@ -200,6 +268,23 @@ register_queue_config() -> queue_config_callback([_, AppStr, KeyStr], Value) -> queue_config_callback(l2a(AppStr), l2a(KeyStr), Value). + +queue_config_callback(App, low_watermark, Value) -> + {ok, Env} = emqttd:env(App), + Parse = fun(S) -> + {match, [N]} = re:run(S, "^([0-9]+)%$", [{capture, all_but_first, list}]), + list_to_integer(N) / 100 + end, + application:set_env(?APP, App, lists:keyreplace(low_watermark, 1, Env, {low_watermark, Parse(Value)})), + " successfully\n"; +queue_config_callback(App, high_watermark, Value) -> + {ok, Env} = emqttd:env(App), + Parse = fun(S) -> + {match, [N]} = re:run(S, "^([0-9]+)%$", [{capture, all_but_first, list}]), + list_to_integer(N) / 100 + end, + application:set_env(?APP, App, lists:keyreplace(high_watermark, 1, Env, {high_watermark, Parse(Value)})), + " successfully\n"; queue_config_callback(App, Key, Value) -> {ok, Env} = emqttd:env(App), application:set_env(?APP, App, lists:keyreplace(Key, 1, Env, {Key, Value})), @@ -242,3 +327,16 @@ lager_config_callback(_, Value) -> register_config_whitelist(ConfigKeys) -> clique:register_config_whitelist(ConfigKeys, ?APP). +%%-------------------------------------------------------------------- +%% Inner Function +%%-------------------------------------------------------------------- +any_to_string(I) when is_integer(I) -> + integer_to_list(I); +any_to_string(F) when is_float(F)-> + float_to_list(F,[{decimals, 4}]); +any_to_string(A) when is_atom(A) -> + atom_to_list(A); +any_to_string(B) when is_binary(B) -> + binary_to_list(B); +any_to_string(L) when is_list(L) -> + L. \ No newline at end of file diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index c7167c8a7..6631b4566 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -34,7 +34,7 @@ -export([start_link/2]). %% Management and Monitor API --export([info/1, stats/1, kick/1]). +-export([info/1, stats/1, kick/1, clean_acl_cache/2]). -export([set_rate_limit/2, get_rate_limit/1]). @@ -92,6 +92,9 @@ unsubscribe(CPid, Topics) -> session(CPid) -> gen_server2:call(CPid, session, infinity). +clean_acl_cache(CPid, Topic) -> + gen_server2:call(CPid, {clean_acl_cache, Topic}). + %%-------------------------------------------------------------------- %% gen_server Callbacks %%-------------------------------------------------------------------- @@ -175,6 +178,10 @@ handle_call(get_rate_limit, _From, State = #client_state{rate_limit = Rl}) -> handle_call(session, _From, State = #client_state{proto_state = ProtoState}) -> reply(emqttd_protocol:session(ProtoState), State); +handle_call({clean_acl_cache, Topic}, _From, State) -> + erase({acl, publish, Topic}), + reply(ok, State); + handle_call(Req, _From, State) -> ?UNEXPECTED_REQ(Req, State). diff --git a/src/emqttd_config.erl b/src/emqttd_config.erl index 06256bbfd..77a4e8949 100644 --- a/src/emqttd_config.erl +++ b/src/emqttd_config.erl @@ -52,15 +52,21 @@ dump(_App, _Terms) -> %% TODO ok. --spec(set(atom(), atom(), term()) -> ok). +-spec(set(atom(), list(), list()) -> ok). set(App, Par, Val) -> - application:set_env(App, Par, Val). + emqttd_cli_config:run(["config", + "set", + lists:concat([Par, "=", Val]), + lists:concat(["--app=", App])]). --spec(get(atom(), atom()) -> undefined | {ok, term()}). +-spec(get(atom(), list()) -> undefined | {ok, term()}). get(App, Par) -> - application:get_env(App, Par). + case emqttd_cli_config:get_cfg(App, Par) of + undefined -> undefined; + Val -> {ok, Val} + end. --spec(get(atom(), atom(), atom()) -> term()). +-spec(get(atom(), list(), atom()) -> term()). get(App, Par, Def) -> - application:get_env(App, Par, Def). + emqttd_cli_config:get_cfg(App, Par, Def). diff --git a/src/emqttd_ctl.erl b/src/emqttd_ctl.erl index 0cec222fd..77769e3c8 100644 --- a/src/emqttd_ctl.erl +++ b/src/emqttd_ctl.erl @@ -68,6 +68,9 @@ run([]) -> usage(), ok; run(["help"]) -> usage(), ok; +run(["set"] = CmdS) when length(CmdS) =:= 1 -> + emqttd_cli_config:set_usage(), ok; + run(["set" | _] = CmdS) -> emqttd_cli_config:run(["config" | CmdS]), ok; diff --git a/src/emqttd_mgmt.erl b/src/emqttd_mgmt.erl index a46f70d09..56675bbea 100644 --- a/src/emqttd_mgmt.erl +++ b/src/emqttd_mgmt.erl @@ -41,7 +41,9 @@ -export([publish/1, subscribe/1, unsubscribe/1]). --export([kick_client/1]). +-export([kick_client/1, clean_acl_cache/2]). + +-export([modify_config/3, modify_config/4, get_configs/0, get_config/1]). -define(KB, 1024). -define(MB, (1024*1024)). @@ -289,10 +291,7 @@ unsubscribe({ClientId, Topic})-> %%-------------------------------------------------------------------- kick_client(ClientId) -> Result = [kick_client(Node, ClientId) || Node <- ekka_mnesia:running_nodes()], - case lists:any(fun(Item) -> Item =:= ok end, Result) of - true -> {ok, [{status, success}]}; - false -> {ok, [{status, failure}]} - end. + lists:any(fun(Item) -> Item =:= ok end, Result). kick_client(Node, ClientId) when Node =:= node() -> case emqttd_cm:lookup(ClientId) of @@ -302,6 +301,39 @@ kick_client(Node, ClientId) when Node =:= node() -> kick_client(Node, ClientId) -> rpc_call(Node, kick_client, [Node, ClientId]). + +clean_acl_cache(ClientId, Topic) -> + Result = [clean_acl_cache(Node, ClientId, Topic) || Node <- ekka_mnesia:running_nodes()], + lists:any(fun(Item) -> Item =:= ok end, Result). + +clean_acl_cache(Node, ClientId, Topic) when Node =:= node() -> + case emqttd_cm:lookup(ClientId) of + undefined -> error; + #mqtt_client{client_pid = Pid}-> emqttd_client:clean_acl_cache(Pid, Topic) + end; +clean_acl_cache(Node, ClientId, Topic) -> + rpc_call(Node, clean_acl_cache, [Node, ClientId, Topic]). + +%%-------------------------------------------------------------------- +%% Config ENV +%%-------------------------------------------------------------------- +modify_config(App, Key, Value) -> + Result = [modify_config(Node, App, Key, Value) || Node <- ekka_mnesia:running_nodes()], + lists:any(fun(Item) -> Item =:= ok end, Result). + +modify_config(Node, App, Key, Value) when Node =:= node() -> + emqttd_config:set(App, Key, Value); +modify_config(Node, App, Key, Value) -> + rpc_call(Node, modify_config, [Node, App, Key, Value]). + +get_configs() -> + [{Node, get_config(Node)} || Node <- ekka_mnesia:running_nodes()]. + +get_config(Node) when Node =:= node()-> + emqttd_cli_config:all_cfgs(); +get_config(Node) -> + rpc_call(Node, get_config, [Node]). + %%-------------------------------------------------------------------- %% Internel Functions. %%-------------------------------------------------------------------- diff --git a/src/emqttd_rest_api.erl b/src/emqttd_rest_api.erl index 815081a4f..97d887415 100644 --- a/src/emqttd_rest_api.erl +++ b/src/emqttd_rest_api.erl @@ -24,7 +24,8 @@ -http_api({"^nodes/(.+?)/clients/?$", 'GET', client_list, []}). -http_api({"^nodes/(.+?)/clients/(.+?)/?$", 'GET',client_list, []}). -http_api({"^clients/(.+?)/?$", 'GET', client, []}). --http_api({"^kick_client/(.+?)/?$", 'PUT', kick_client, []}). +-http_api({"^clients/(.+?)/?$", 'DELETE', kick_client, []}). +-http_api({"^clean_acl_cache/(.+?)/?$", 'PUT', clean_acl_cache, [{<<"topic">>, binary}]}). -http_api({"^routes?$", 'GET', route_list, []}). -http_api({"^routes/(.+?)/?$", 'GET', route, []}). @@ -55,14 +56,19 @@ -http_api({"^nodes/(.+?)/plugins/?$", 'GET', plugin_list, []}). -http_api({"^nodes/(.+?)/plugins/(.+?)/?$", 'PUT', enabled, [{<<"active">>, bool}]}). +-http_api({"^configs/(.+?)/?$", 'PUT', modify_config, [{<<"key">>, binary}, {<<"value">>, binary}]}). +-http_api({"^configs/?$", 'GET', config_list, []}). +-http_api({"^nodes/(.+?)/configs/(.+?)/?$", 'PUT', modify_config, [{<<"key">>, binary}, {<<"value">>, binary}]}). +-http_api({"^nodes/(.+?)/configs/?$", 'GET', config_list, []}). -export([alarm_list/3]). --export([client/3, client_list/3, client_list/4, kick_client/3]). +-export([client/3, client_list/3, client_list/4, kick_client/3, clean_acl_cache/3]). -export([route/3, route_list/2]). -export([session/3, session_list/3, session_list/4]). -export([subscription/3, subscription_list/3, subscription_list/4]). -export([nodes/2, node/3, brokers/2, broker/3, listeners/2, listener/3, metrics/2, metric/3, stats/2, stat/3]). -export([publish/2, subscribe/2, unsubscribe/2]). -export([plugin_list/3, enabled/4]). +-export([modify_config/3, modify_config/4, config_list/2, config_list/3]). %%-------------------------------------------------------------------------- %% alarm @@ -106,10 +112,17 @@ client_list('GET', Params, Node, Key) -> Data = emqttd_mgmt:client_list(l2a(Node), l2b(Key), PageNo, PageSize), {ok, [{objects, [client_row(Row) || Row <- Data]}]}. -kick_client('PUT', _Params, Key) -> +kick_client('DELETE', _Params, Key) -> case emqttd_mgmt:kick_client(l2b(Key)) of - ok -> {ok, []}; - error -> {error, [{code, ?ERROR12}]} + true -> {ok, []}; + false -> {error, [{code, ?ERROR12}]} + end. + +clean_acl_cache('PUT', Params, Key) -> + Topic = proplists:get_value(<<"topic">>, Params), + case emqttd_mgmt:clean_acl_cache(l2b(Key), Topic) of + true -> {ok, []}; + false -> {error, [{code, ?ERROR12}]} end. client_row(#mqtt_client{client_id = ClientId, @@ -355,6 +368,44 @@ plugin(#mqtt_plugin{name = Name, version = Ver, descr = Descr, {description, iolist_to_binary(Descr)}, {active, Active}]. +%%-------------------------------------------------------------------------- +%% modify config +%%-------------------------------------------------------------------------- +modify_config('PUT', Params, App) -> + Key = proplists:get_value(<<"key">>, Params, <<"">>), + Value = proplists:get_value(<<"value">>, Params, <<"">>), + case emqttd_mgmt:modify_config(l2a(App), b2l(Key), b2l(Value)) of + true -> {ok, []}; + false -> {error, [{code, ?ERROR13}]} + end. + +modify_config('PUT', Params, Node, App) -> + Key = proplists:get_value(<<"key">>, Params, <<"">>), + Value = proplists:get_value(<<"value">>, Params, <<"">>), + case emqttd_mgmt:modify_config(l2a(Node), l2a(App), b2l(Key), b2l(Value)) of + ok -> {ok, []}; + _ -> {error, [{code, ?ERROR13}]} + end. + +config_list('GET', _Params) -> + Data = emqttd_mgmt:get_configs(), + {ok, [{Node, format_config(Config, [])} || {Node, Config} <- Data]}. + +config_list('GET', _Params, Node) -> + Data = emqttd_mgmt:get_config(l2a(Node)), + {ok, [format_config(Config) || Config <- lists:reverse(Data)]}. + +format_config([], Acc) -> + Acc; +format_config([{Key, Value, Datatpye, App}| Configs], Acc) -> + format_config(Configs, [format_config({Key, Value, Datatpye, App}) | Acc]). + +format_config({Key, Value, Datatpye, App}) -> + [{<<"key">>, l2b(Key)}, + {<<"value">>, l2b(Value)}, + {<<"datatpye">>, l2b(Datatpye)}, + {<<"app">>, App}]. + %%-------------------------------------------------------------------------- %% Inner function %%-------------------------------------------------------------------------- @@ -391,9 +442,10 @@ bin(undefined) -> <<>>. int(L) -> list_to_integer(L). l2a(L) -> list_to_atom(L). l2b(L) -> list_to_binary(L). +b2l(B) -> binary_to_list(B). page_params(Params) -> PageNo = int(proplists:get_value("curr_page", Params, "1")), PageSize = int(proplists:get_value("page_size", Params, "20")), - {PageNo, PageSize}. \ No newline at end of file + {PageNo, PageSize}. diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index 2433b2ea8..a583611a4 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -34,7 +34,7 @@ -export([start_link/4]). %% Management and Monitor API --export([info/1, stats/1, kick/1]). +-export([info/1, stats/1, kick/1, clean_acl_cache/2]). %% SUB/UNSUB Asynchronously -export([subscribe/2, unsubscribe/2]). @@ -82,6 +82,9 @@ unsubscribe(CPid, Topics) -> session(CPid) -> gen_server2:call(CPid, session). +clean_acl_cache(CPid, Topic) -> + gen_server2:call(CPid, {clean_acl_cache, Topic}). + %%-------------------------------------------------------------------- %% gen_server Callbacks %%-------------------------------------------------------------------- @@ -133,6 +136,10 @@ handle_call(kick, _From, State) -> handle_call(session, _From, State = #wsclient_state{proto_state = ProtoState}) -> reply(emqttd_protocol:session(ProtoState), State); +handle_call({clean_acl_cache, Topic}, _From, State) -> + erase({acl, publish, Topic}), + reply(ok, State); + handle_call(Req, _From, State) -> ?WSLOG(error, "Unexpected request: ~p", [Req], State), reply({error, unexpected_request}, State). diff --git a/test/emqttd_config_SUITE.erl b/test/emqttd_config_SUITE.erl index 6ea3f7ece..04c957b75 100644 --- a/test/emqttd_config_SUITE.erl +++ b/test/emqttd_config_SUITE.erl @@ -20,7 +20,130 @@ -include("emqttd.hrl"). -all() -> []. +-include_lib("eunit/include/eunit.hrl"). -groups() -> []. +-include_lib("common_test/include/ct.hrl"). +all() -> + [{group, emq_config}]. + +groups() -> + [{emq_config, [sequence], + [run_protocol_cmd, + run_client_cmd, + run_session_cmd, + run_queue_cmd, + run_auth_cmd, + run_lager_cmd, + run_connection_cmd, + run_broker_config] + }]. + +init_per_suite(Config) -> + Config. + +end_per_suite(Config) -> + Config. + +run_protocol_cmd(_Config) -> + SetConfigKeys = [{"max_clientid_len=2048", int}, + {"max_packet_size=1024", int}, + % {"websocket_protocol_header=off", atom}, + {"keepalive_backoff=0.5", float}], + lists:foreach(fun set_cmd/1, SetConfigKeys), + R = lists:sort(lists:map(fun env_value/1, SetConfigKeys)), + {ok, E} = application:get_env(emqttd, protocol), + ?assertEqual(R, lists:sort(E)), + emqttd_cli_config:run(["config", "set", "mqtt.websocket_protocol_header=off", "--app=emqttd"]), + {ok, E1} = application:get_env(emqttd, websocket_protocol_header), + ?assertEqual(false, E1). + +run_client_cmd(_Config) -> + SetConfigKeys = [{"max_publish_rate=100", int}, + {"idle_timeout=60s", date}, + {"enable_stats=on", atom}], + lists:foreach(fun(Key) -> set_cmd("client", Key) end, SetConfigKeys), + R = lists:sort(lists:map(fun(Key) -> env_value("client", Key) end, SetConfigKeys)), + {ok, E} = application:get_env(emqttd, client), + ?assertEqual(R, lists:sort(E)). + +run_session_cmd(_Config) -> + SetConfigKeys = [{"max_subscriptions=5", int}, + {"upgrade_qos=on", atom}, + {"max_inflight=64", int}, + {"retry_interval=60s", date}, + {"max_awaiting_rel=200", int}, + {"await_rel_timeout=60s",date}, + {"enable_stats=on", atom}, + {"expiry_interval=60s", date}, + {"ignore_loop_deliver=true", atom}], + lists:foreach(fun(Key) -> set_cmd("session", Key) end, SetConfigKeys), + R = lists:sort(lists:map(fun env_value/1, SetConfigKeys)), + {ok, E} = application:get_env(emqttd, session), + ?assertEqual(R, lists:sort(E)). + +run_queue_cmd(_Config) -> + SetConfigKeys = [{"type=priority", atom}, + {"priority=hah", string}, + {"max_length=2000", int}, + {"low_watermark=40%",percent}, + {"high_watermark=80%", percent}, + {"store_qos0=false", atom}], + lists:foreach(fun(Key) -> set_cmd("mqueue", Key) end, SetConfigKeys), + R = lists:sort(lists:map(fun env_value/1, SetConfigKeys)), + {ok, E} = application:get_env(emqttd, mqueue), + ?assertEqual(R, lists:sort(E)). + +run_auth_cmd(_Config) -> + SetConfigKeys = [{"allow_anonymous=true", atom}, + {"acl_nomatch=deny", atom}, + {"acl_file=etc/test.acl", string}, + {"cache_acl=false", atom}], + lists:foreach(fun set_cmd/1, SetConfigKeys), + {ok, true} = application:get_env(emqttd, allow_anonymous), + {ok, deny} = application:get_env(emqttd, acl_nomatch), + {ok, "etc/test.acl"} = application:get_env(emqttd, acl_file), + {ok, false} = application:get_env(emqttd, cache_acl). + +run_lager_cmd(_Config) -> + emqttd_cli_config:run(["config", "set", "log.console.level=info", "--app=emqttd"]), + ok. + +run_connection_cmd(_Config) -> + emqttd_cli_config:run(["config", "set", "mqtt.conn.force_gc_count=1000", "--app=emqttd"]), + {ok, E} = application:get_env(emqttd, conn_force_gc_count), + ?assertEqual(1000, E). + +run_broker_config(_Config) -> + emqttd_cli_config:run(["config", "set", "mqtt.broker.sys_interval=10", "--app=emqttd"]), + {ok, E} = application:get_env(emqttd, broker_sys_interval), + ?assertEqual(10, E). + +env_value("client", {Key, Type}) -> + case string:split(Key, "=") of + ["max_publish_rate", V] -> + {list_to_atom("max_publish_rate"), format(Type, V)}; + [K, V] -> + {list_to_atom(string:join(["client", K], "_")), format(Type, V)} + end. + +env_value({Key, Type}) -> + [K, V] = string:split(Key, "="), + {list_to_atom(K), format(Type, V)}. + +format(string, S) -> S; +format(atom, "on") -> true; +format(atom, "off") -> false; +format(atom, A) -> list_to_atom(A); +format(float, F) -> list_to_float(F); +format(percent, P) -> + {match, [N]} = re:run(P, "^([0-9]+)%$", [{capture, all_but_first, list}]), + list_to_integer(N) / 100; +format(int, I) -> list_to_integer(I); +format(date, _I) -> 60000. + +set_cmd({Key, _Type}) -> + emqttd_cli_config:run(["config", "set", string:join(["mqtt", Key], "."), "--app=emqttd"]). + +set_cmd(Pre, {Key, _Type}) -> + emqttd_cli_config:run(["config", "set", string:join(["mqtt", Pre, Key], "."), "--app=emqttd"]).