From a902f508b5858e3018ea92795917367cc15749c2 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 9 Apr 2018 14:32:49 +0800 Subject: [PATCH] Use emqx_config:get_env/1 to read env --- src/emqx_access_control.erl | 4 ++-- src/emqx_app.erl | 2 +- src/emqx_bridge_sup_sup.erl | 2 +- src/emqx_cm.erl | 14 ++++---------- src/emqx_gc.erl | 2 +- src/emqx_log.erl | 20 +++++++++++++++----- src/emqx_modules.erl | 4 ++-- src/emqx_mqtt.erl | 9 +++++---- src/emqx_plugins.erl | 16 ++++++++-------- src/emqx_protocol.erl | 2 +- src/emqx_session.erl | 4 ++-- src/emqx_sysmon_sup.erl | 2 +- src/emqx_tracer.erl | 2 +- src/emqx_ws.erl | 6 +++--- src/emqx_ws_connection_sup.erl | 2 +- 15 files changed, 48 insertions(+), 43 deletions(-) diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 7f665211e..988a625dc 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -51,7 +51,7 @@ start_link() -> auth(Client, Password) when is_record(Client, client) -> auth(Client, Password, lookup_mods(auth)). auth(_Client, _Password, []) -> - case emqx_conf:get_env(allow_anonymous, false) of + case emqx_config:get_env(allow_anonymous, false) of true -> ok; false -> {error, "No auth module to check!"} end; @@ -73,7 +73,7 @@ check_acl(Client, PubSub, Topic) when ?PS(PubSub) -> check_acl(Client, PubSub, Topic, lookup_mods(acl)). check_acl(_Client, _PubSub, _Topic, []) -> - emqx_conf:get_env(acl_nomatch, allow); + emqx_config:get_env(acl_nomatch, allow); check_acl(Client, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) -> case Mod:check_acl({Client, PubSub, Topic}, State) of allow -> allow; diff --git a/src/emqx_app.erl b/src/emqx_app.erl index fc1774a7b..a4c0493d8 100644 --- a/src/emqx_app.erl +++ b/src/emqx_app.erl @@ -63,7 +63,7 @@ print_vsn() -> %%-------------------------------------------------------------------- register_acl_mod() -> - case emqx_conf:get_env(acl_file) of + case emqx_config:get_env(acl_file) of {ok, File} -> emqx_access_control:register_mod(acl, emqx_acl_internal, [File]); undefined -> ok end. diff --git a/src/emqx_bridge_sup_sup.erl b/src/emqx_bridge_sup_sup.erl index 88e75792e..7c26cd047 100644 --- a/src/emqx_bridge_sup_sup.erl +++ b/src/emqx_bridge_sup_sup.erl @@ -46,7 +46,7 @@ start_bridge(Node, Topic) when is_atom(Node) andalso is_binary(Topic) -> start_bridge(Node, _Topic, _Options) when Node =:= node() -> {error, bridge_to_self}; start_bridge(Node, Topic, Options) when is_atom(Node) andalso is_binary(Topic) -> - {ok, BridgeEnv} = emqx_conf:get_env(bridge), + {ok, BridgeEnv} = emqx_config:get_env(bridge), Options1 = emqx_misc:merge_opts(BridgeEnv, Options), supervisor:start_child(?MODULE, bridge_spec(Node, Topic, Options1)). diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 568cbd951..5a2ff93b4 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -20,7 +20,7 @@ -include("emqx.hrl"). --export([start_link/1]). +-export([start_link/0]). -export([lookup/1, reg/1, unreg/1]). @@ -36,9 +36,9 @@ %%-------------------------------------------------------------------- %% @doc Start the client manager --spec(start_link(fun()) -> {ok, pid()} | ignore | {error, term()}). -start_link(StatsFun) -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [StatsFun], []). +-spec(start_link() -> {ok, pid()} | ignore | {error, term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). %% @doc Lookup ClientPid by ClientId -spec(lookup(client_id()) -> pid() | undefined). @@ -102,9 +102,6 @@ handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> {noreply, State} end; -handle_info(stats, State) -> - {noreply, setstats(State), hibernate}; - handle_info(Info, State) -> emqx_log:error("[CM] Unexpected info: ~p", [Info]), {noreply, State}. @@ -132,6 +129,3 @@ erase_monitor(MRef, State = #state{monitors = Monitors}) -> erlang:demonitor(MRef), State#state{monitors = dict:erase(MRef, Monitors)}. -setstats(State = #state{stats_fun = StatsFun}) -> - StatsFun(ets:info(client, size)), State. - diff --git a/src/emqx_gc.erl b/src/emqx_gc.erl index e0fbed270..4901cce5e 100644 --- a/src/emqx_gc.erl +++ b/src/emqx_gc.erl @@ -23,7 +23,7 @@ -spec(conn_max_gc_count() -> integer()). conn_max_gc_count() -> - case emqx:env(conn_force_gc_count) of + case emqx_config:get_env(conn_force_gc_count) of {ok, I} when I > 0 -> I + rand:uniform(I); {ok, I} when I =< 0 -> undefined; undefined -> undefined diff --git a/src/emqx_log.erl b/src/emqx_log.erl index 16b70b676..7f85b3309 100644 --- a/src/emqx_log.erl +++ b/src/emqx_log.erl @@ -18,34 +18,44 @@ -compile({no_auto_import,[error/1]}). --export([debug/1, debug/2, - info/1, info/2, - warning/1, warning/2, - error/1, error/2, - critical/1, critical/2]). +-export([debug/1, debug/2, debug/3, + info/1, info/2, info/3, + warning/1, warning/2, warning/3, + error/1, error/2, error/3, + critical/1, critical/2, critical/3]). debug(Msg) -> lager:debug(Msg). debug(Format, Args) -> lager:debug(Format, Args). +debug(Metadata, Format, Args) when is_list(Metadata) -> + lager:debug(Metadata, Format, Args). info(Msg) -> lager:info(Msg). info(Format, Args) -> lager:info(Format, Args). +info(Metadata, Format, Args) when is_list(Metadata) -> + lager:info(Metadata, Format, Args). warning(Msg) -> lager:warning(Msg). warning(Format, Args) -> lager:warning(Format, Args). +warning(Metadata, Format, Args) when is_list(Metadata) -> + lager:warning(Metadata, Format, Args). error(Msg) -> lager:error(Msg). error(Format, Args) -> lager:error(Format, Args). +error(Metadata, Format, Args) when is_list(Metadata) -> + lager:error(Metadata, Format, Args). critical(Msg) -> lager:critical(Msg). critical(Format, Args) -> lager:critical(Format, Args). +critical(Metadata, Format, Args) when is_list(Metadata) -> + lager:critical(Metadata, Format, Args). diff --git a/src/emqx_modules.erl b/src/emqx_modules.erl index a7407abcc..31dbd3c4a 100644 --- a/src/emqx_modules.erl +++ b/src/emqx_modules.erl @@ -23,11 +23,11 @@ load() -> fun({Mod, Env}) -> ok = Mod:load(Env), io:format("Load ~s module successfully.~n", [Mod]) - end, emqx_conf:get_env(modules, [])). + end, emqx_config:get_env(modules, [])). unload() -> lists:foreach( fun({Mod, Env}) -> Mod:unload(Env) end, - emqx_conf:get_env(modules, [])). + emqx_config:get_env(modules, [])). diff --git a/src/emqx_mqtt.erl b/src/emqx_mqtt.erl index 664fc88a0..d7a7281cf 100644 --- a/src/emqx_mqtt.erl +++ b/src/emqx_mqtt.erl @@ -40,7 +40,7 @@ shutdown() -> %% @doc Start Listeners. -spec(start_listeners() -> ok). start_listeners() -> - lists:foreach(fun start_listener/1, emqx_conf:get_env(listeners, [])). + lists:foreach(fun start_listener/1, emqx_config:get_env(listeners, [])). %% Start mqtt listener -spec(start_listener(listener()) -> {ok, pid()} | {error, any()}). @@ -60,7 +60,8 @@ start_listener({Proto, ListenOn, Opts}) when Proto == https; Proto == wss -> {ok, _} = mochiweb:start_http('mqtt:wss', ListenOn, Opts, {emqx_ws, handle_request, []}). start_listener(Proto, ListenOn, Opts) -> - Env = lists:append(emqx_conf:get_env(client, []), emqx_conf:get_env(protocol, [])), + Env = lists:append(emqx_config:get_env(client, []), + emqx_config:get_env(protocol, [])), MFArgs = {emqx_connection, start_link, [Env]}, {ok, _} = esockd:open(Proto, ListenOn, merge_sockopts(Opts), MFArgs). @@ -76,7 +77,7 @@ is_mqtt(_Proto) -> false. %% @doc Stop Listeners -spec(stop_listeners() -> ok). stop_listeners() -> - lists:foreach(fun stop_listener/1, emqx_conf:get_env(listeners, [])). + lists:foreach(fun stop_listener/1, emqx_config:get_env(listeners, [])). -spec(stop_listener(listener()) -> ok | {error, any()}). stop_listener({tcp, ListenOn, _Opts}) -> @@ -96,7 +97,7 @@ stop_listener({Proto, ListenOn, _Opts}) -> -spec(restart_listeners() -> ok). restart_listeners() -> lists:foreach(fun restart_listener/1, - emqx_conf:get_env(listeners, [])). + emqx_config:get_env(listeners, [])). -spec(restart_listener(listener()) -> any()). restart_listener({tcp, ListenOn, _Opts}) -> diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index 9e5f0dff3..3d40d1849 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -31,7 +31,7 @@ %% @doc Init plugins' config -spec(init() -> ok). init() -> - case emqx_conf:get_env(plugins_etc_dir) of + case emqx_config:get_env(plugins_etc_dir) of {ok, PluginsEtc} -> CfgFiles = [filename:join(PluginsEtc, File) || File <- filelib:wildcard("*.config", PluginsEtc)], @@ -50,7 +50,7 @@ init_config(CfgFile) -> -spec(load() -> list() | {error, term()}). load() -> load_expand_plugins(), - case emqx_conf:get_env(plugins_loaded_file) of + case emqx_config:get_env(plugins_loaded_file) of {ok, File} -> ensure_file(File), with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end); @@ -60,7 +60,7 @@ load() -> end. load_expand_plugins() -> - case emqx_conf:get_env(expand_plugins_dir) of + case emqx_config:get_env(expand_plugins_dir) of {ok, Dir} -> PluginsDir = filelib:wildcard("*", Dir), lists:foreach(fun(PluginDir) -> @@ -101,7 +101,7 @@ init_expand_plugin_config(PluginDir) -> end, AppsEnv). get_expand_plugin_config() -> - case emqx_conf:get_env(expand_plugins_dir) of + case emqx_config:get_env(expand_plugins_dir) of {ok, Dir} -> PluginsDir = filelib:wildcard("*", Dir), lists:foldl(fun(PluginDir, Acc) -> @@ -144,7 +144,7 @@ load_plugins(Names, Persistent) -> %% @doc Unload all plugins before broker stopped. -spec(unload() -> list() | {error, term()}). unload() -> - case emqx_conf:get_env(plugins_loaded_file) of + case emqx_config:get_env(plugins_loaded_file) of {ok, File} -> with_loaded_file(File, fun stop_plugins/1); undefined -> @@ -158,7 +158,7 @@ stop_plugins(Names) -> %% @doc List all available plugins -spec(list() -> [plugin()]). list() -> - case emqx_conf:get_env(plugins_etc_dir) of + case emqx_config:get_env(plugins_etc_dir) of {ok, PluginsEtc} -> CfgFiles = filelib:wildcard("*.{conf,config}", PluginsEtc) ++ get_expand_plugin_config(), Plugins = [plugin(CfgFile) || CfgFile <- CfgFiles], @@ -313,7 +313,7 @@ plugin_unloaded(Name, true) -> end. read_loaded() -> - case emqx_conf:get_env(plugins_loaded_file) of + case emqx_config:get_env(plugins_loaded_file) of {ok, File} -> read_loaded(File); undefined -> {error, not_found} end. @@ -321,7 +321,7 @@ read_loaded() -> read_loaded(File) -> file:consult(File). write_loaded(AppNames) -> - {ok, File} = emqx_conf:get_env(plugins_loaded_file), + {ok, File} = emqx_config:get_env(plugins_loaded_file), case file:open(File, [binary, write]) of {ok, Fd} -> lists:foreach(fun(Name) -> diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 0b0abb903..a86c19459 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -549,7 +549,7 @@ authenticate(Client, Password) -> %% PUBLISH ACL is cached in process dictionary. check_acl(publish, Topic, Client) -> - IfCache = emqx_conf:get_env(cache_acl, true), + IfCache = emqx_config:get_env(cache_acl, true), case {IfCache, get({acl, publish, Topic})} of {true, undefined} -> AllowDeny = emqx_access_control:check_acl(Client, publish, Topic), diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 208af034b..f6f88de40 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -271,8 +271,8 @@ init(#{clean_start := CleanStart, process_flag(trap_exit, true), true = link(ClientPid), init_stats([deliver_msg, enqueue_msg]), - {ok, Env} = emqx_conf:get_env(session), - {ok, QEnv} = emqx_conf:get_env(mqueue), + {ok, Env} = emqx_config:get_env(session), + {ok, QEnv} = emqx_config:get_env(mqueue), MaxInflight = get_value(max_inflight, Env, 0), EnableStats = get_value(enable_stats, Env, false), IgnoreLoopDeliver = get_value(ignore_loop_deliver, Env, false), diff --git a/src/emqx_sysmon_sup.erl b/src/emqx_sysmon_sup.erl index bdaf1c48d..0c4566bc3 100644 --- a/src/emqx_sysmon_sup.erl +++ b/src/emqx_sysmon_sup.erl @@ -28,7 +28,7 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - {ok, Env} = emqx_conf:get_env(sysmon), + {ok, Env} = emqx_config:get_env(sysmon), Sysmon = {sysmon, {emqx_sysmon, start_link, [Env]}, permanent, 5000, worker, [emqx_sysmon]}, {ok, {{one_for_one, 10, 100}, [Sysmon]}}. diff --git a/src/emqx_tracer.erl b/src/emqx_tracer.erl index 510bd0439..1d0211e78 100644 --- a/src/emqx_tracer.erl +++ b/src/emqx_tracer.erl @@ -91,7 +91,7 @@ all_traces() -> %%-------------------------------------------------------------------- init([]) -> - {ok, #state{level = emqx_conf:get_env(trace_level, debug), traces = #{}}}. + {ok, #state{level = emqx_config:get_env(trace_level, debug), traces = #{}}}. handle_call({start_trace, Who, LogFile}, _From, State = #state{level = Level, traces = Traces}) -> case lager:trace_file(LogFile, [Who], Level, ?OPTIONS) of diff --git a/src/emqx_ws.erl b/src/emqx_ws.erl index 0fbea0639..b2bdc2e54 100644 --- a/src/emqx_ws.erl +++ b/src/emqx_ws.erl @@ -45,7 +45,7 @@ handle_request('GET', "/mqtt", Req) -> {true, "mqtt" ++ _Vsn} -> case Req:get(peername) of {ok, Peername} -> - {ok, ProtoEnv} = emqx_conf:get_env(protocol), + {ok, ProtoEnv} = emqx_config:get_env(protocol), PacketSize = get_value(max_packet_size, ProtoEnv, ?MAX_PACKET_SIZE), Parser = emqx_parser:initial_state(PacketSize), %% Upgrade WebSocket. @@ -72,11 +72,11 @@ handle_request(Method, Path, Req) -> Req:not_found(). is_websocket(Upgrade) -> - (not emqx_conf:get_env(websocket_check_upgrade_header, true)) orelse + (not emqx_config:get_env(websocket_check_upgrade_header, true)) orelse (Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket"). check_protocol_header(Req) -> - case emqx_conf:get_env(websocket_protocol_header, false) of + case emqx_config:get_env(websocket_protocol_header, false) of true -> get_protocol_header(Req); false -> "mqtt-v3.1.1" end. diff --git a/src/emqx_ws_connection_sup.erl b/src/emqx_ws_connection_sup.erl index 5073c8002..b58e7c956 100644 --- a/src/emqx_ws_connection_sup.erl +++ b/src/emqx_ws_connection_sup.erl @@ -37,7 +37,7 @@ start_connection(WsPid, Req, ReplyChannel) -> init([]) -> %%TODO: Cannot upgrade the environments, Use zone? - Env = lists:append(emqx_conf:get_env(client, []), emqx_conf:get_env(protocol, [])), + Env = lists:append(emqx_config:get_env(client, []), emqx_config:get_env(protocol, [])), {ok, {{simple_one_for_one, 0, 1}, [{ws_connection, {emqx_ws_connection, start_link, [Env]}, temporary, 5000, worker, [emqx_ws_connection]}]}}.