refactor(config): remove emqx:get_env/1,2
This commit is contained in:
parent
b28435f811
commit
1433a7e0fe
|
@ -29,10 +29,6 @@
|
|||
, stop/0
|
||||
]).
|
||||
|
||||
-export([ get_env/1
|
||||
, get_env/2
|
||||
]).
|
||||
|
||||
%% PubSub API
|
||||
-export([ subscribe/1
|
||||
, subscribe/2
|
||||
|
@ -126,15 +122,6 @@ is_running(Node) ->
|
|||
Pid when is_pid(Pid) -> true
|
||||
end.
|
||||
|
||||
%% @doc Get environment
|
||||
-spec(get_env(Key :: atom()) -> maybe(term())).
|
||||
get_env(Key) ->
|
||||
get_env(Key, undefined).
|
||||
|
||||
-spec(get_env(Key :: atom(), Default :: term()) -> term()).
|
||||
get_env(Key, Default) ->
|
||||
application:get_env(?APP, Key, Default).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% PubSub API
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -69,7 +69,7 @@ stop(_State) ->
|
|||
andalso emqx_listeners:stop().
|
||||
|
||||
set_backtrace_depth() ->
|
||||
Depth = emqx:get_env(backtrace_depth, 16),
|
||||
Depth = emqx_config:get([node, backtrace_depth]),
|
||||
_ = erlang:system_flag(backtrace_depth, Depth),
|
||||
ok.
|
||||
|
||||
|
|
|
@ -25,5 +25,5 @@ is_enabled(Mod) ->
|
|||
(BootMods = boot_modules()) =:= all orelse lists:member(Mod, BootMods).
|
||||
|
||||
boot_modules() ->
|
||||
emqx:get_env(boot_modules, ?BOOT_MODULES).
|
||||
application:get_env(emqx, boot_modules, ?BOOT_MODULES).
|
||||
|
||||
|
|
|
@ -243,7 +243,7 @@ route(Routes, Delivery) ->
|
|||
do_route({To, Node}, Delivery) when Node =:= node() ->
|
||||
{Node, To, dispatch(To, Delivery)};
|
||||
do_route({To, Node}, Delivery) when is_atom(Node) ->
|
||||
{Node, To, forward(Node, To, Delivery, emqx:get_env(rpc_mode, async))};
|
||||
{Node, To, forward(Node, To, Delivery, emqx_config:get([rpc, mode]))};
|
||||
do_route({To, Group}, Delivery) when is_tuple(Group); is_binary(Group) ->
|
||||
{share, To, emqx_shared_sub:dispatch(Group, To, Delivery)}.
|
||||
|
||||
|
|
|
@ -62,5 +62,5 @@ unlock(ClientId) ->
|
|||
|
||||
-spec(strategy() -> local | leader | quorum | all).
|
||||
strategy() ->
|
||||
emqx:get_env(session_locking_strategy, quorum).
|
||||
emqx_config:get([broker, session_locking_strategy]).
|
||||
|
||||
|
|
|
@ -66,7 +66,7 @@ start_link() ->
|
|||
%% @doc Is the global registry enabled?
|
||||
-spec(is_enabled() -> boolean()).
|
||||
is_enabled() ->
|
||||
emqx:get_env(enable_session_registry, true).
|
||||
emqx_config:get([broker, enable_session_registry]).
|
||||
|
||||
%% @doc Register a global channel.
|
||||
-spec(register_channel(emqx_types:clientid()
|
||||
|
|
|
@ -216,10 +216,10 @@ load_config_file() ->
|
|||
lists:foldl(fun(ConfFile, Acc) ->
|
||||
{ok, RawConf} = hocon:load(ConfFile, #{format => richmap}),
|
||||
emqx_map_lib:deep_merge(Acc, RawConf)
|
||||
end, #{}, emqx:get_env(config_files, [])).
|
||||
end, #{}, application:get_env(emqx, config_files, [])).
|
||||
|
||||
emqx_override_conf_name() ->
|
||||
File = filename:join([emqx:get_env(data_dir), "emqx_override.conf"]),
|
||||
File = filename:join([emqx_config:get([node, data_dir]), "emqx_override.conf"]),
|
||||
ok = filelib:ensure_dir(File),
|
||||
File.
|
||||
|
||||
|
|
|
@ -85,7 +85,7 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
ensure_timer(State) ->
|
||||
case emqx:get_env(global_gc_interval) of
|
||||
case emqx_config:get([node, global_gc_interval]) of
|
||||
undefined -> State;
|
||||
Interval -> TRef = emqx_misc:start_timer(timer:seconds(Interval), run),
|
||||
State#{timer := TRef}
|
||||
|
|
|
@ -44,7 +44,7 @@
|
|||
%% @doc Load all plugins when the broker started.
|
||||
-spec(load() -> ok | ignore | {error, term()}).
|
||||
load() ->
|
||||
ok = load_ext_plugins(emqx:get_env(expand_plugins_dir)).
|
||||
ok = load_ext_plugins(emqx_config:get([plugins, expand_plugins_dir])).
|
||||
|
||||
%% @doc Load a Plugin
|
||||
-spec(load(atom()) -> ok | {error, term()}).
|
||||
|
|
|
@ -160,20 +160,20 @@ fields("node") ->
|
|||
sensitive => true,
|
||||
override_env => "EMQX_NODE_COOKIE"
|
||||
})}
|
||||
, {"data_dir", t(string(), "emqx.data_dir", undefined)}
|
||||
, {"data_dir", t(string(), undefined, undefined)}
|
||||
, {"config_files", t(list(string()), "emqx.config_files",
|
||||
[ filename:join([os:getenv("RUNNER_ETC_DIR"), "emqx.conf"])
|
||||
])}
|
||||
, {"global_gc_interval", t(duration_s(), "emqx.global_gc_interval", "15m")}
|
||||
, {"global_gc_interval", t(duration_s(), undefined, "15m")}
|
||||
, {"crash_dump_dir", t(file(), "vm_args.-env ERL_CRASH_DUMP", undefined)}
|
||||
, {"dist_net_ticktime", t(duration(), "vm_args.-kernel net_ticktime", "2m")}
|
||||
, {"dist_listen_min", t(range(1024, 65535), "kernel.inet_dist_listen_min", 6369)}
|
||||
, {"dist_listen_max", t(range(1024, 65535), "kernel.inet_dist_listen_max", 6369)}
|
||||
, {"backtrace_depth", t(integer(), "emqx.backtrace_depth", 23)}
|
||||
, {"backtrace_depth", t(integer(), undefined, 23)}
|
||||
];
|
||||
|
||||
fields("rpc") ->
|
||||
[ {"mode", t(union(sync, async), "emqx.rpc_mode", async)}
|
||||
[ {"mode", t(union(sync, async), undefined, async)}
|
||||
, {"async_batch_size", t(integer(), "gen_rpc.max_batch_size", 256)}
|
||||
, {"port_discovery",t(union(manual, stateless), "gen_rpc.port_discovery", stateless)}
|
||||
, {"tcp_server_port", t(integer(), "gen_rpc.tcp_server_port", 5369)}
|
||||
|
@ -471,7 +471,7 @@ fields("rule") ->
|
|||
[ {"$id", t(string())}];
|
||||
|
||||
fields("plugins") ->
|
||||
[ {"expand_plugins_dir", t(string(), "emqx.expand_plugins_dir", undefined)}
|
||||
[ {"expand_plugins_dir", t(string())}
|
||||
];
|
||||
|
||||
fields("broker") ->
|
||||
|
@ -487,7 +487,7 @@ fields("broker") ->
|
|||
|
||||
fields("perf") ->
|
||||
[ {"route_lock_type", t(union([key, tab, global]), undefined, key)}
|
||||
, {"trie_compaction", t(boolean(), "emqx.trie_compaction", true)}
|
||||
, {"trie_compaction", t(boolean(), undefined, true)}
|
||||
];
|
||||
|
||||
fields("sysmon") ->
|
||||
|
|
|
@ -137,11 +137,11 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
|
|||
|
||||
-spec(strategy() -> strategy()).
|
||||
strategy() ->
|
||||
emqx:get_env(shared_subscription_strategy, random).
|
||||
emqx_config:get([broker, shared_subscription_strategy]).
|
||||
|
||||
-spec(ack_enabled() -> boolean()).
|
||||
ack_enabled() ->
|
||||
emqx:get_env(shared_dispatch_ack_enabled, false).
|
||||
emqx_config:get([broker, shared_dispatch_ack_enabled]).
|
||||
|
||||
do_dispatch(SubPid, Topic, Msg, _Type) when SubPid =:= self() ->
|
||||
%% Deadlock otherwise
|
||||
|
|
|
@ -28,8 +28,6 @@
|
|||
-export([ insert/1
|
||||
, match/1
|
||||
, delete/1
|
||||
, put_compaction_flag/1
|
||||
, put_default_compaction_flag/0
|
||||
]).
|
||||
|
||||
-export([ empty/0
|
||||
|
@ -50,21 +48,12 @@
|
|||
, count = 0 :: non_neg_integer()
|
||||
}).
|
||||
|
||||
-define(IS_COMPACT, true).
|
||||
|
||||
-rlog_shard({?ROUTE_SHARD, ?TRIE}).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Mnesia bootstrap
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
put_compaction_flag(Bool) when is_boolean(Bool) ->
|
||||
_ = persistent_term:put({?MODULE, compaction}, Bool),
|
||||
ok.
|
||||
|
||||
put_default_compaction_flag() ->
|
||||
ok = put_compaction_flag(?IS_COMPACT).
|
||||
|
||||
%% @doc Create or replicate topics table.
|
||||
-spec(mnesia(boot | copy) -> ok).
|
||||
mnesia(boot) ->
|
||||
|
@ -279,16 +268,7 @@ match_compact([Word | Words], Prefix, IsWildcard, Acc0) ->
|
|||
lookup_topic(MlTopic).
|
||||
|
||||
is_compact() ->
|
||||
case persistent_term:get({?MODULE, compaction}, undefined) of
|
||||
undefined ->
|
||||
Default = ?IS_COMPACT,
|
||||
FromEnv = emqx:get_env(trie_compaction, Default),
|
||||
_ = put_compaction_flag(FromEnv),
|
||||
true = is_boolean(FromEnv),
|
||||
FromEnv;
|
||||
Value when is_boolean(Value) ->
|
||||
Value
|
||||
end.
|
||||
emqx_config:get([broker, perf, trie_compaction]).
|
||||
|
||||
-ifdef(TEST).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
@ -315,10 +295,11 @@ words(T) -> emqx_topic:words(T).
|
|||
|
||||
make_prefixes_t(Topic) -> make_prefixes(words(Topic)).
|
||||
|
||||
with_compact_flag(IsCmopact, F) ->
|
||||
put_compaction_flag(IsCmopact),
|
||||
with_compact_flag(IsCompact, F) ->
|
||||
OldV = is_compact(),
|
||||
set_compact(IsCompact),
|
||||
try F()
|
||||
after put_default_compaction_flag()
|
||||
after set_compact(OldV)
|
||||
end.
|
||||
|
||||
make_prefixes_test_() ->
|
||||
|
@ -344,6 +325,9 @@ do_compact_test() ->
|
|||
do_compact(words(<<"a/+/+/+/+/b">>))),
|
||||
ok.
|
||||
|
||||
set_compact(Bool) ->
|
||||
emqx_config:put([broker, perf, trie_compaction], Bool).
|
||||
|
||||
clear_tables() -> ekka_mnesia:clear_table(?TRIE).
|
||||
|
||||
-endif. % TEST
|
||||
|
|
|
@ -52,14 +52,6 @@ t_stop_start(_) ->
|
|||
ok = emqx:shutdown(for_test),
|
||||
false = emqx:is_running(node()).
|
||||
|
||||
t_get_env(_) ->
|
||||
?assertEqual(undefined, emqx:get_env(undefined_key)),
|
||||
?assertEqual(default_value, emqx:get_env(undefined_key, default_value)),
|
||||
application:set_env(emqx, undefined_key, hello),
|
||||
?assertEqual(hello, emqx:get_env(undefined_key)),
|
||||
?assertEqual(hello, emqx:get_env(undefined_key, default_value)),
|
||||
application:unset_env(emqx, undefined_key).
|
||||
|
||||
t_emqx_pubsub_api(_) ->
|
||||
true = emqx:is_running(node()),
|
||||
{ok, C} = emqtt:start_link([{host, "localhost"}, {clientid, "myclient"}]),
|
||||
|
|
|
@ -45,7 +45,7 @@
|
|||
-spec save_files_return_opts(opts_input(), atom() | string() | binary(),
|
||||
string() | binary()) -> opts().
|
||||
save_files_return_opts(Options, SubDir, ResId) ->
|
||||
Dir = filename:join([emqx:get_env(data_dir), SubDir, ResId]),
|
||||
Dir = filename:join([emqx_config:get([node, data_dir]), SubDir, ResId]),
|
||||
save_files_return_opts(Options, Dir).
|
||||
|
||||
%% @doc Parse ssl options input.
|
||||
|
@ -76,7 +76,7 @@ save_files_return_opts(Options, Dir) ->
|
|||
%% empty string is returned if the input is empty.
|
||||
-spec save_file(file_input(), atom() | string() | binary()) -> string().
|
||||
save_file(Param, SubDir) ->
|
||||
Dir = filename:join([emqx:get_env(data_dir), SubDir]),
|
||||
Dir = filename:join([emqx_config:get([node, data_dir]), SubDir]),
|
||||
do_save_file(Param, Dir).
|
||||
|
||||
filter([]) -> [];
|
||||
|
|
|
@ -506,7 +506,7 @@ connect(Options) when is_list(Options) ->
|
|||
connect(Options = #{disk_cache := DiskCache, ecpool_worker_id := Id, pool_name := Pool}) ->
|
||||
Options0 = case DiskCache of
|
||||
true ->
|
||||
DataDir = filename:join([emqx:get_env(data_dir), replayq, Pool, integer_to_list(Id)]),
|
||||
DataDir = filename:join([emqx_config:get([node, data_dir]), replayq, Pool, integer_to_list(Id)]),
|
||||
QueueOption = #{replayq_dir => DataDir},
|
||||
Options#{queue => QueueOption};
|
||||
false ->
|
||||
|
|
Loading…
Reference in New Issue