Merge remote-tracking branch 'origin/develop'

This commit is contained in:
zhanghongtong 2019-09-10 12:54:27 +08:00
commit 36806153c7
16 changed files with 49 additions and 107 deletions

View File

@ -29,16 +29,18 @@
, stop/0
]).
-export([ get_env/1
, get_env/2
]).
%% PubSub API
-export([ subscribe/1
, subscribe/2
, subscribe/3
, publish/1
, unsubscribe/1
]).
-export([publish/1]).
-export([unsubscribe/1]).
%% PubSub management API
-export([ topics/0
, subscriptions/1
@ -101,6 +103,15 @@ is_running(Node) ->
Pid when is_pid(Pid) -> true
end.
%% @doc Get environment
-spec(get_env(Key :: atom()) -> maybe(term())).
get_env(Key) ->
get_env(Key, undefined).
-spec(get_env(Key :: atom(), Default :: term()) -> term()).
get_env(Key, Default) ->
application:get_env(?APP, Key, Default).
%%--------------------------------------------------------------------
%% PubSub API
%%--------------------------------------------------------------------

View File

@ -236,7 +236,7 @@ route(Routes, Delivery) ->
do_route({To, Node}, Delivery) when Node =:= node() ->
{Node, To, dispatch(To, Delivery)};
do_route({To, Node}, Delivery) when is_atom(Node) ->
{Node, To, forward(Node, To, Delivery, emqx_config:get_env(rpc_mode, async))};
{Node, To, forward(Node, To, Delivery, emqx:get_env(rpc_mode, async))};
do_route({To, Group}, Delivery) when is_tuple(Group); is_binary(Group) ->
{share, To, emqx_shared_sub:dispatch(Group, To, Delivery)}.

View File

@ -62,5 +62,5 @@ unlock(ClientId) ->
-spec(strategy() -> local | one | quorum | all).
strategy() ->
emqx_config:get_env(session_locking_strategy, quorum).
emqx:get_env(session_locking_strategy, quorum).

View File

@ -62,7 +62,7 @@ start_link() ->
%% @doc Is the global registry enabled?
-spec(is_enabled() -> boolean()).
is_enabled() ->
emqx_config:get_env(enable_channel_registry, true).
emqx:get_env(enable_channel_registry, true).
%% @doc Register a global channel.
-spec(register_channel(emqx_types:client_id()

View File

@ -1,32 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_config).
-export([ get_env/1
, get_env/2
]).
-define(APP, emqx).
%% @doc Get environment
-spec(get_env(Key :: atom()) -> term() | undefined).
get_env(Key) ->
get_env(Key, undefined).
-spec(get_env(Key :: atom(), Default :: term()) -> term()).
get_env(Key, Default) ->
application:get_env(?APP, Key, Default).

View File

@ -105,7 +105,7 @@ start_link() ->
gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) ->
Interval = emqx_config:get_env(flapping_clean_interval, ?default_flapping_clean_interval),
Interval = emqx:get_env(flapping_clean_interval, ?default_flapping_clean_interval),
TabOpts = [ public
, set
, {keypos, 2}

View File

@ -42,7 +42,7 @@
%% @doc Start all listeners.
-spec(start() -> ok).
start() ->
lists:foreach(fun start_listener/1, emqx_config:get_env(listeners, [])).
lists:foreach(fun start_listener/1, emqx:get_env(listeners, [])).
-spec(start_listener(listener()) -> {ok, pid()} | {error, term()}).
start_listener({Proto, ListenOn, Options}) ->
@ -113,7 +113,7 @@ with_port({Addr, Port}, Opts = #{socket_opts := SocketOption}) ->
%% @doc Restart all listeners
-spec(restart() -> ok).
restart() ->
lists:foreach(fun restart_listener/1, emqx_config:get_env(listeners, [])).
lists:foreach(fun restart_listener/1, emqx:get_env(listeners, [])).
-spec(restart_listener(listener()) -> any()).
restart_listener({Proto, ListenOn, Options}) ->
@ -136,7 +136,7 @@ restart_listener(Proto, ListenOn, _Opts) ->
%% @doc Stop all listeners.
-spec(stop() -> ok).
stop() ->
lists:foreach(fun stop_listener/1, emqx_config:get_env(listeners, [])).
lists:foreach(fun stop_listener/1, emqx:get_env(listeners, [])).
-spec(stop_listener(listener()) -> ok | {error, term()}).
stop_listener({Proto, ListenOn, Opts}) ->

View File

@ -79,8 +79,7 @@ reload_acl() ->
%% Internal Functions
%%--------------------------------------------------------------------
acl_file() ->
emqx_config:get_env(acl_file).
acl_file() -> emqx:get_env(acl_file).
lookup(PubSub, Rules) ->
maps:get(PubSub, Rules, []).

View File

@ -34,8 +34,7 @@ load({Mod, Env}) ->
ok = Mod:load(Env),
?LOG(info, "Load ~s module successfully.", [Mod]).
modules() ->
emqx_config:get_env(modules, []).
modules() -> emqx:get_env(modules, []).
%% @doc Unload all the extended modules.
-spec(unload() -> ok).

View File

@ -39,7 +39,7 @@
%% @doc Init plugins' config
-spec(init() -> ok).
init() ->
case emqx_config:get_env(plugins_etc_dir) of
case emqx:get_env(plugins_etc_dir) of
undefined -> ok;
PluginsEtc ->
CfgFiles = [filename:join(PluginsEtc, File) ||
@ -57,16 +57,15 @@ init_config(CfgFile) ->
-spec(load() -> list() | {error, term()}).
load() ->
load_expand_plugins(),
case emqx_config:get_env(plugins_loaded_file) of
undefined -> %% No plugins available
ignore;
case emqx:get_env(plugins_loaded_file) of
undefined -> ignore; %% No plugins available
File ->
ensure_file(File),
with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end)
end.
load_expand_plugins() ->
case emqx_config:get_env(expand_plugins_dir) of
case emqx:get_env(expand_plugins_dir) of
undefined -> ok;
ExpandPluginsDir ->
Plugins = filelib:wildcard("*", ExpandPluginsDir),
@ -138,9 +137,8 @@ load_plugins(Names, Persistent) ->
%% @doc Unload all plugins before broker stopped.
-spec(unload() -> list() | {error, term()}).
unload() ->
case emqx_config:get_env(plugins_loaded_file) of
undefined ->
ignore;
case emqx:get_env(plugins_loaded_file) of
undefined -> ignore;
File ->
with_loaded_file(File, fun stop_plugins/1)
end.
@ -300,7 +298,7 @@ plugin_unloaded(Name, true) ->
end.
read_loaded() ->
case emqx_config:get_env(plugins_loaded_file) of
case emqx:get_env(plugins_loaded_file) of
undefined -> {error, not_found};
File -> read_loaded(File)
end.
@ -308,7 +306,7 @@ read_loaded() ->
read_loaded(File) -> file:consult(File).
write_loaded(AppNames) ->
FilePath = emqx_config:get_env(plugins_loaded_file),
FilePath = emqx:get_env(plugins_loaded_file),
case file:write_file(FilePath, [io_lib:format("~p.~n", [Name]) || Name <- AppNames]) of
ok -> ok;
{error, Error} ->

View File

@ -127,11 +127,11 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
-spec(strategy() -> random | round_robin | sticky | hash).
strategy() ->
emqx_config:get_env(shared_subscription_strategy, round_robin).
emqx:get_env(shared_subscription_strategy, round_robin).
-spec(ack_enabled() -> boolean()).
ack_enabled() ->
emqx_config:get_env(shared_dispatch_ack_enabled, false).
emqx:get_env(shared_dispatch_ack_enabled, false).
do_dispatch(SubPid, Topic, Msg, _Type) when SubPid =:= self() ->
%% Deadlock otherwise

View File

@ -111,12 +111,12 @@ datetime() ->
%% @doc Get sys interval
-spec(sys_interval() -> pos_integer()).
sys_interval() ->
emqx_config:get_env(broker_sys_interval, 60000).
emqx:get_env(broker_sys_interval, 60000).
%% @doc Get sys heatbeat interval
-spec(sys_heatbeat_interval() -> pos_integer()).
sys_heatbeat_interval() ->
emqx_config:get_env(broker_sys_heartbeat, 30000).
emqx:get_env(broker_sys_heartbeat, 30000).
%% @doc Get sys info
-spec(info() -> list(tuple())).

View File

@ -48,6 +48,5 @@ child_spec(Mod, Args) ->
modules => [Mod]
}.
config(Name) ->
emqx_config:get_env(Name, []).
config(Name) -> emqx:get_env(Name, []).

View File

@ -66,18 +66,17 @@ start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-spec(get_env(maybe(zone()), atom()) -> maybe(term())).
get_env(undefined, Key) ->
emqx_config:get_env(Key);
get_env(undefined, Key) -> emqx:get_env(Key);
get_env(Zone, Key) ->
get_env(Zone, Key, undefined).
-spec(get_env(maybe(zone()), atom(), term()) -> maybe(term())).
get_env(undefined, Key, Def) ->
emqx_config:get_env(Key, Def);
emqx:get_env(Key, Def);
get_env(Zone, Key, Def) ->
try persistent_term:get(?KEY(Zone, Key))
catch error:badarg ->
emqx_config:get_env(Key, Def)
emqx:get_env(Key, Def)
end.
-spec(set_env(zone(), atom(), term()) -> ok).
@ -132,5 +131,5 @@ code_change(_OldVsn, State, _Extra) ->
do_reload() ->
[persistent_term:put(?KEY(Zone, Key), Val)
|| {Zone, Opts} <- emqx_config:get_env(zones, []), {Key, Val} <- Opts].
|| {Zone, Opts} <- emqx:get_env(zones, []), {Key, Val} <- Opts].

View File

@ -32,6 +32,14 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
t_get_env(_) ->
?assertEqual(undefined, emqx:get_env(undefined_key)),
?assertEqual(default_value, emqx:get_env(undefined_key, default_value)),
application:set_env(emqx, undefined_key, hello),
?assertEqual(hello, emqx:get_env(undefined_key)),
?assertEqual(hello, emqx:get_env(undefined_key, default_value)),
application:unset_env(emqx, undefined_key).
t_emqx_pubsub_api(_) ->
emqx:start(),
true = emqx:is_running(node()),

View File

@ -1,39 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_config_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
t_get_env(_) ->
?assertEqual(undefined, emqx_config:get_env(undefined_key)),
?assertEqual(default_value, emqx_config:get_env(undefined_key, default_value)),
application:set_env(emqx, undefined_key, hello),
?assertEqual(hello, emqx_config:get_env(undefined_key)),
?assertEqual(hello, emqx_config:get_env(undefined_key, default_value)),
application:unset_env(emqx, undefined_key).