refactor(config): move emqx_config:get/get_raw to emqx:get_config/get_raw_config (#5517)
This commit is contained in:
parent
5f6bcd1ebb
commit
e8e95d39ef
|
@ -55,7 +55,12 @@
|
||||||
-export([ set_debug_secret/1
|
-export([ set_debug_secret/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ update_config/2
|
%% Configs APIs
|
||||||
|
-export([ get_config/1
|
||||||
|
, get_config/2
|
||||||
|
, get_raw_config/1
|
||||||
|
, get_raw_config/2
|
||||||
|
, update_config/2
|
||||||
, update_config/3
|
, update_config/3
|
||||||
, remove_config/1
|
, remove_config/1
|
||||||
, remove_config/2
|
, remove_config/2
|
||||||
|
@ -192,6 +197,22 @@ run_hook(HookPoint, Args) ->
|
||||||
run_fold_hook(HookPoint, Args, Acc) ->
|
run_fold_hook(HookPoint, Args, Acc) ->
|
||||||
emqx_hooks:run_fold(HookPoint, Args, Acc).
|
emqx_hooks:run_fold(HookPoint, Args, Acc).
|
||||||
|
|
||||||
|
-spec get_config(emqx_map_lib:config_key_path()) -> term().
|
||||||
|
get_config(KeyPath) ->
|
||||||
|
emqx_config:get(KeyPath).
|
||||||
|
|
||||||
|
-spec get_config(emqx_map_lib:config_key_path(), term()) -> term().
|
||||||
|
get_config(KeyPath, Default) ->
|
||||||
|
emqx_config:get(KeyPath, Default).
|
||||||
|
|
||||||
|
-spec get_raw_config(emqx_map_lib:config_key_path()) -> term().
|
||||||
|
get_raw_config(KeyPath) ->
|
||||||
|
emqx_config:get_raw(KeyPath).
|
||||||
|
|
||||||
|
-spec get_raw_config(emqx_map_lib:config_key_path(), term()) -> term().
|
||||||
|
get_raw_config(KeyPath, Default) ->
|
||||||
|
emqx_config:get_raw(KeyPath, Default).
|
||||||
|
|
||||||
-spec update_config(emqx_map_lib:config_key_path(), emqx_config:update_request()) ->
|
-spec update_config(emqx_map_lib:config_key_path(), emqx_config:update_request()) ->
|
||||||
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
|
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
|
||||||
update_config(KeyPath, UpdateReq) ->
|
update_config(KeyPath, UpdateReq) ->
|
||||||
|
|
|
@ -199,7 +199,7 @@ handle_call({activate_alarm, Name, Details}, _From, State) ->
|
||||||
message = normalize_message(Name, Details),
|
message = normalize_message(Name, Details),
|
||||||
activate_at = erlang:system_time(microsecond)},
|
activate_at = erlang:system_time(microsecond)},
|
||||||
ekka_mnesia:dirty_write(?ACTIVATED_ALARM, Alarm),
|
ekka_mnesia:dirty_write(?ACTIVATED_ALARM, Alarm),
|
||||||
do_actions(activate, Alarm, emqx_config:get([alarm, actions])),
|
do_actions(activate, Alarm, emqx:get_config([alarm, actions])),
|
||||||
{reply, ok, State}
|
{reply, ok, State}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
@ -268,11 +268,11 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
get_validity_period() ->
|
get_validity_period() ->
|
||||||
emqx_config:get([alarm, validity_period]).
|
emqx:get_config([alarm, validity_period]).
|
||||||
|
|
||||||
deactivate_alarm(Details, #activated_alarm{activate_at = ActivateAt, name = Name,
|
deactivate_alarm(Details, #activated_alarm{activate_at = ActivateAt, name = Name,
|
||||||
details = Details0, message = Msg0}) ->
|
details = Details0, message = Msg0}) ->
|
||||||
SizeLimit = emqx_config:get([alarm, size_limit]),
|
SizeLimit = emqx:get_config([alarm, size_limit]),
|
||||||
case SizeLimit > 0 andalso (mnesia:table_info(?DEACTIVATED_ALARM, size) >= SizeLimit) of
|
case SizeLimit > 0 andalso (mnesia:table_info(?DEACTIVATED_ALARM, size) >= SizeLimit) of
|
||||||
true ->
|
true ->
|
||||||
case mnesia:dirty_first(?DEACTIVATED_ALARM) of
|
case mnesia:dirty_first(?DEACTIVATED_ALARM) of
|
||||||
|
@ -289,7 +289,7 @@ deactivate_alarm(Details, #activated_alarm{activate_at = ActivateAt, name = Name
|
||||||
erlang:system_time(microsecond)),
|
erlang:system_time(microsecond)),
|
||||||
ekka_mnesia:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm),
|
ekka_mnesia:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm),
|
||||||
ekka_mnesia:dirty_delete(?ACTIVATED_ALARM, Name),
|
ekka_mnesia:dirty_delete(?ACTIVATED_ALARM, Name),
|
||||||
do_actions(deactivate, DeActAlarm, emqx_config:get([alarm, actions])).
|
do_actions(deactivate, DeActAlarm, emqx:get_config([alarm, actions])).
|
||||||
|
|
||||||
make_deactivated_alarm(ActivateAt, Name, Details, Message, DeActivateAt) ->
|
make_deactivated_alarm(ActivateAt, Name, Details, Message, DeActivateAt) ->
|
||||||
#deactivated_alarm{
|
#deactivated_alarm{
|
||||||
|
|
|
@ -242,7 +242,7 @@ route(Routes, Delivery) ->
|
||||||
do_route({To, Node}, Delivery) when Node =:= node() ->
|
do_route({To, Node}, Delivery) when Node =:= node() ->
|
||||||
{Node, To, dispatch(To, Delivery)};
|
{Node, To, dispatch(To, Delivery)};
|
||||||
do_route({To, Node}, Delivery) when is_atom(Node) ->
|
do_route({To, Node}, Delivery) when is_atom(Node) ->
|
||||||
{Node, To, forward(Node, To, Delivery, emqx_config:get([rpc, mode]))};
|
{Node, To, forward(Node, To, Delivery, emqx:get_config([rpc, mode]))};
|
||||||
do_route({To, Group}, Delivery) when is_tuple(Group); is_binary(Group) ->
|
do_route({To, Group}, Delivery) when is_tuple(Group); is_binary(Group) ->
|
||||||
{share, To, emqx_shared_sub:dispatch(Group, To, Delivery)}.
|
{share, To, emqx_shared_sub:dispatch(Group, To, Delivery)}.
|
||||||
|
|
||||||
|
|
|
@ -62,5 +62,5 @@ unlock(ClientId) ->
|
||||||
|
|
||||||
-spec(strategy() -> local | leader | quorum | all).
|
-spec(strategy() -> local | leader | quorum | all).
|
||||||
strategy() ->
|
strategy() ->
|
||||||
emqx_config:get([broker, session_locking_strategy]).
|
emqx:get_config([broker, session_locking_strategy]).
|
||||||
|
|
||||||
|
|
|
@ -65,7 +65,7 @@ start_link() ->
|
||||||
%% @doc Is the global registry enabled?
|
%% @doc Is the global registry enabled?
|
||||||
-spec(is_enabled() -> boolean()).
|
-spec(is_enabled() -> boolean()).
|
||||||
is_enabled() ->
|
is_enabled() ->
|
||||||
emqx_config:get([broker, enable_session_registry]).
|
emqx:get_config([broker, enable_session_registry]).
|
||||||
|
|
||||||
%% @doc Register a global channel.
|
%% @doc Register a global channel.
|
||||||
-spec(register_channel(emqx_types:clientid()
|
-spec(register_channel(emqx_types:clientid()
|
||||||
|
|
|
@ -43,6 +43,12 @@
|
||||||
, put/2
|
, put/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export([ get_raw/1
|
||||||
|
, get_raw/2
|
||||||
|
, put_raw/1
|
||||||
|
, put_raw/2
|
||||||
|
]).
|
||||||
|
|
||||||
-export([ save_schema_mod_and_names/1
|
-export([ save_schema_mod_and_names/1
|
||||||
, get_schema_mod/0
|
, get_schema_mod/0
|
||||||
, get_schema_mod/1
|
, get_schema_mod/1
|
||||||
|
@ -61,12 +67,6 @@
|
||||||
, find_listener_conf/3
|
, find_listener_conf/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ get_raw/1
|
|
||||||
, get_raw/2
|
|
||||||
, put_raw/1
|
|
||||||
, put_raw/2
|
|
||||||
]).
|
|
||||||
|
|
||||||
-define(CONF, conf).
|
-define(CONF, conf).
|
||||||
-define(RAW_CONF, raw_conf).
|
-define(RAW_CONF, raw_conf).
|
||||||
-define(PERSIS_SCHEMA_MODS, {?MODULE, schema_mods}).
|
-define(PERSIS_SCHEMA_MODS, {?MODULE, schema_mods}).
|
||||||
|
|
|
@ -905,7 +905,7 @@ get_state(Pid) ->
|
||||||
tl(tuple_to_list(State)))).
|
tl(tuple_to_list(State)))).
|
||||||
|
|
||||||
get_active_n(Zone, Listener) ->
|
get_active_n(Zone, Listener) ->
|
||||||
case emqx_config:get([zones, Zone, listeners, Listener, type]) of
|
case emqx:get_config([zones, Zone, listeners, Listener, type]) of
|
||||||
quic -> 100;
|
quic -> 100;
|
||||||
_ -> emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n])
|
_ -> emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n])
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -160,4 +160,4 @@ start_timer(Zone) ->
|
||||||
start_timers() ->
|
start_timers() ->
|
||||||
lists:foreach(fun({Zone, _ZoneConf}) ->
|
lists:foreach(fun({Zone, _ZoneConf}) ->
|
||||||
start_timer(Zone)
|
start_timer(Zone)
|
||||||
end, maps:to_list(emqx_config:get([zones], #{}))).
|
end, maps:to_list(emqx:get_config([zones], #{}))).
|
||||||
|
|
|
@ -85,7 +85,7 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
ensure_timer(State) ->
|
ensure_timer(State) ->
|
||||||
case emqx_config:get([node, global_gc_interval]) of
|
case emqx:get_config([node, global_gc_interval]) of
|
||||||
undefined -> State;
|
undefined -> State;
|
||||||
Interval -> TRef = emqx_misc:start_timer(Interval, run),
|
Interval -> TRef = emqx_misc:start_timer(Interval, run),
|
||||||
State#{timer := TRef}
|
State#{timer := TRef}
|
||||||
|
|
|
@ -43,7 +43,7 @@ list() ->
|
||||||
[{listener_id(ZoneName, LName), LConf} || {ZoneName, LName, LConf} <- do_list()].
|
[{listener_id(ZoneName, LName), LConf} || {ZoneName, LName, LConf} <- do_list()].
|
||||||
|
|
||||||
do_list() ->
|
do_list() ->
|
||||||
Zones = maps:to_list(emqx_config:get([zones], #{})),
|
Zones = maps:to_list(emqx:get_config([zones], #{})),
|
||||||
lists:append([list(ZoneName, ZoneConf) || {ZoneName, ZoneConf} <- Zones]).
|
lists:append([list(ZoneName, ZoneConf) || {ZoneName, ZoneConf} <- Zones]).
|
||||||
|
|
||||||
list(ZoneName, ZoneConf) ->
|
list(ZoneName, ZoneConf) ->
|
||||||
|
|
|
@ -76,7 +76,7 @@ set_procmem_high_watermark(Float) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
Opts = emqx_config:get([sysmon, os]),
|
Opts = emqx:get_config([sysmon, os]),
|
||||||
set_mem_check_interval(maps:get(mem_check_interval, Opts)),
|
set_mem_check_interval(maps:get(mem_check_interval, Opts)),
|
||||||
set_sysmem_high_watermark(maps:get(sysmem_high_watermark, Opts)),
|
set_sysmem_high_watermark(maps:get(sysmem_high_watermark, Opts)),
|
||||||
set_procmem_high_watermark(maps:get(procmem_high_watermark, Opts)),
|
set_procmem_high_watermark(maps:get(procmem_high_watermark, Opts)),
|
||||||
|
@ -91,8 +91,8 @@ handle_cast(Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({timeout, _Timer, check}, State) ->
|
handle_info({timeout, _Timer, check}, State) ->
|
||||||
CPUHighWatermark = emqx_config:get([sysmon, os, cpu_high_watermark]) * 100,
|
CPUHighWatermark = emqx:get_config([sysmon, os, cpu_high_watermark]) * 100,
|
||||||
CPULowWatermark = emqx_config:get([sysmon, os, cpu_low_watermark]) * 100,
|
CPULowWatermark = emqx:get_config([sysmon, os, cpu_low_watermark]) * 100,
|
||||||
_ = case emqx_vm:cpu_util() of %% TODO: should be improved?
|
_ = case emqx_vm:cpu_util() of %% TODO: should be improved?
|
||||||
0 -> ok;
|
0 -> ok;
|
||||||
Busy when Busy >= CPUHighWatermark ->
|
Busy when Busy >= CPUHighWatermark ->
|
||||||
|
@ -123,7 +123,7 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
start_check_timer() ->
|
start_check_timer() ->
|
||||||
Interval = emqx_config:get([sysmon, os, cpu_check_interval]),
|
Interval = emqx:get_config([sysmon, os, cpu_check_interval]),
|
||||||
case erlang:system_info(system_architecture) of
|
case erlang:system_info(system_architecture) of
|
||||||
"x86_64-pc-linux-musl" -> ok;
|
"x86_64-pc-linux-musl" -> ok;
|
||||||
_ -> emqx_misc:start_timer(Interval, check)
|
_ -> emqx_misc:start_timer(Interval, check)
|
||||||
|
|
|
@ -43,7 +43,7 @@
|
||||||
%% @doc Load all plugins when the broker started.
|
%% @doc Load all plugins when the broker started.
|
||||||
-spec(load() -> ok | ignore | {error, term()}).
|
-spec(load() -> ok | ignore | {error, term()}).
|
||||||
load() ->
|
load() ->
|
||||||
ok = load_ext_plugins(emqx_config:get([plugins, expand_plugins_dir], undefined)).
|
ok = load_ext_plugins(emqx:get_config([plugins, expand_plugins_dir], undefined)).
|
||||||
|
|
||||||
%% @doc Load a Plugin
|
%% @doc Load a Plugin
|
||||||
-spec(load(atom()) -> ok | {error, term()}).
|
-spec(load(atom()) -> ok | {error, term()}).
|
||||||
|
|
|
@ -250,7 +250,7 @@ delete_trie_route(Route = #route{topic = Topic}) ->
|
||||||
%% @private
|
%% @private
|
||||||
-spec(maybe_trans(function(), list(any())) -> ok | {error, term()}).
|
-spec(maybe_trans(function(), list(any())) -> ok | {error, term()}).
|
||||||
maybe_trans(Fun, Args) ->
|
maybe_trans(Fun, Args) ->
|
||||||
case emqx_config:get([broker, perf, route_lock_type]) of
|
case emqx:get_config([broker, perf, route_lock_type]) of
|
||||||
key ->
|
key ->
|
||||||
trans(Fun, Args);
|
trans(Fun, Args);
|
||||||
global ->
|
global ->
|
||||||
|
|
|
@ -72,4 +72,4 @@ filter_result(Delivery) ->
|
||||||
Delivery.
|
Delivery.
|
||||||
|
|
||||||
max_client_num() ->
|
max_client_num() ->
|
||||||
emqx_config:get([rpc, tcp_client_num], ?DefaultClientNum).
|
emqx:get_config([rpc, tcp_client_num], ?DefaultClientNum).
|
||||||
|
|
|
@ -136,11 +136,11 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
|
||||||
|
|
||||||
-spec(strategy() -> strategy()).
|
-spec(strategy() -> strategy()).
|
||||||
strategy() ->
|
strategy() ->
|
||||||
emqx_config:get([broker, shared_subscription_strategy]).
|
emqx:get_config([broker, shared_subscription_strategy]).
|
||||||
|
|
||||||
-spec(ack_enabled() -> boolean()).
|
-spec(ack_enabled() -> boolean()).
|
||||||
ack_enabled() ->
|
ack_enabled() ->
|
||||||
emqx_config:get([broker, shared_dispatch_ack_enabled]).
|
emqx:get_config([broker, shared_dispatch_ack_enabled]).
|
||||||
|
|
||||||
do_dispatch(SubPid, Topic, Msg, _Type) when SubPid =:= self() ->
|
do_dispatch(SubPid, Topic, Msg, _Type) when SubPid =:= self() ->
|
||||||
%% Deadlock otherwise
|
%% Deadlock otherwise
|
||||||
|
|
|
@ -102,10 +102,10 @@ datetime() ->
|
||||||
"~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])).
|
"~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])).
|
||||||
|
|
||||||
sys_interval() ->
|
sys_interval() ->
|
||||||
emqx_config:get([broker, sys_msg_interval]).
|
emqx:get_config([broker, sys_msg_interval]).
|
||||||
|
|
||||||
sys_heatbeat_interval() ->
|
sys_heatbeat_interval() ->
|
||||||
emqx_config:get([broker, sys_heartbeat_interval]).
|
emqx:get_config([broker, sys_heartbeat_interval]).
|
||||||
|
|
||||||
%% @doc Get sys info
|
%% @doc Get sys info
|
||||||
-spec(info() -> list(tuple())).
|
-spec(info() -> list(tuple())).
|
||||||
|
|
|
@ -60,7 +60,7 @@ start_timer(State) ->
|
||||||
State#{timer := emqx_misc:start_timer(timer:seconds(2), reset)}.
|
State#{timer := emqx_misc:start_timer(timer:seconds(2), reset)}.
|
||||||
|
|
||||||
sysm_opts() ->
|
sysm_opts() ->
|
||||||
sysm_opts(maps:to_list(emqx_config:get([sysmon, vm])), []).
|
sysm_opts(maps:to_list(emqx:get_config([sysmon, vm])), []).
|
||||||
sysm_opts([], Acc) ->
|
sysm_opts([], Acc) ->
|
||||||
Acc;
|
Acc;
|
||||||
sysm_opts([{_, disabled}|Opts], Acc) ->
|
sysm_opts([{_, disabled}|Opts], Acc) ->
|
||||||
|
|
|
@ -270,7 +270,7 @@ match_compact([Word | Words], Prefix, IsWildcard, Acc0) ->
|
||||||
lookup_topic(MlTopic).
|
lookup_topic(MlTopic).
|
||||||
|
|
||||||
is_compact() ->
|
is_compact() ->
|
||||||
emqx_config:get([broker, perf, trie_compaction], true).
|
emqx:get_config([broker, perf, trie_compaction], true).
|
||||||
|
|
||||||
set_compact(Bool) ->
|
set_compact(Bool) ->
|
||||||
emqx_config:put([broker, perf, trie_compaction], Bool).
|
emqx_config:put([broker, perf, trie_compaction], Bool).
|
||||||
|
|
|
@ -57,8 +57,8 @@ handle_cast(Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({timeout, _Timer, check}, State) ->
|
handle_info({timeout, _Timer, check}, State) ->
|
||||||
ProcHighWatermark = emqx_config:get([sysmon, vm, process_high_watermark]),
|
ProcHighWatermark = emqx:get_config([sysmon, vm, process_high_watermark]),
|
||||||
ProcLowWatermark = emqx_config:get([sysmon, vm, process_low_watermark]),
|
ProcLowWatermark = emqx:get_config([sysmon, vm, process_low_watermark]),
|
||||||
ProcessCount = erlang:system_info(process_count),
|
ProcessCount = erlang:system_info(process_count),
|
||||||
case ProcessCount / erlang:system_info(process_limit) of
|
case ProcessCount / erlang:system_info(process_limit) of
|
||||||
Percent when Percent >= ProcHighWatermark ->
|
Percent when Percent >= ProcHighWatermark ->
|
||||||
|
@ -89,5 +89,5 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
start_check_timer() ->
|
start_check_timer() ->
|
||||||
Interval = emqx_config:get([sysmon, vm, process_check_interval]),
|
Interval = emqx:get_config([sysmon, vm, process_check_interval]),
|
||||||
emqx_misc:start_timer(Interval, check).
|
emqx_misc:start_timer(Interval, check).
|
||||||
|
|
|
@ -25,7 +25,7 @@
|
||||||
all() -> emqx_ct:all(?MODULE).
|
all() -> emqx_ct:all(?MODULE).
|
||||||
|
|
||||||
t_check_pub(_) ->
|
t_check_pub(_) ->
|
||||||
OldConf = emqx_config:get([zones]),
|
OldConf = emqx:get_config([zones]),
|
||||||
emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], ?QOS_1),
|
emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], ?QOS_1),
|
||||||
emqx_config:put_zone_conf(default, [mqtt, retain_available], false),
|
emqx_config:put_zone_conf(default, [mqtt, retain_available], false),
|
||||||
timer:sleep(50),
|
timer:sleep(50),
|
||||||
|
@ -39,7 +39,7 @@ t_check_pub(_) ->
|
||||||
emqx_config:put([zones], OldConf).
|
emqx_config:put([zones], OldConf).
|
||||||
|
|
||||||
t_check_sub(_) ->
|
t_check_sub(_) ->
|
||||||
OldConf = emqx_config:get([zones]),
|
OldConf = emqx:get_config([zones]),
|
||||||
SubOpts = #{rh => 0,
|
SubOpts = #{rh => 0,
|
||||||
rap => 0,
|
rap => 0,
|
||||||
nl => 0,
|
nl => 0,
|
||||||
|
|
|
@ -36,7 +36,7 @@ stop(_State) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
initialize() ->
|
initialize() ->
|
||||||
AuthNConfig = emqx_config:get([authentication], #{enable => false,
|
AuthNConfig = emqx:get_config([authentication], #{enable => false,
|
||||||
authenticators => []}),
|
authenticators => []}),
|
||||||
initialize(AuthNConfig).
|
initialize(AuthNConfig).
|
||||||
|
|
||||||
|
|
|
@ -47,7 +47,7 @@ register_metrics() ->
|
||||||
init() ->
|
init() ->
|
||||||
ok = register_metrics(),
|
ok = register_metrics(),
|
||||||
emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE),
|
emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE),
|
||||||
NRules = [init_rule(Rule) || Rule <- emqx_config:get(?CONF_KEY_PATH, [])],
|
NRules = [init_rule(Rule) || Rule <- emqx:get_config(?CONF_KEY_PATH, [])],
|
||||||
ok = emqx_hooks:add('client.authorize', {?MODULE, authorize, [NRules]}, -1).
|
ok = emqx_hooks:add('client.authorize', {?MODULE, authorize, [NRules]}, -1).
|
||||||
|
|
||||||
lookup() ->
|
lookup() ->
|
||||||
|
|
|
@ -87,7 +87,7 @@ t_update_rule(_) ->
|
||||||
{ok, _} = emqx_authz:update(tail, [?RULE3]),
|
{ok, _} = emqx_authz:update(tail, [?RULE3]),
|
||||||
|
|
||||||
Lists1 = emqx_authz:check_rules([?RULE1, ?RULE2, ?RULE3]),
|
Lists1 = emqx_authz:check_rules([?RULE1, ?RULE2, ?RULE3]),
|
||||||
?assertMatch(Lists1, emqx_config:get([authorization, rules], [])),
|
?assertMatch(Lists1, emqx:get_config([authorization, rules], [])),
|
||||||
|
|
||||||
[#{annotations := #{id := Id1,
|
[#{annotations := #{id := Id1,
|
||||||
principal := all,
|
principal := all,
|
||||||
|
@ -109,7 +109,7 @@ t_update_rule(_) ->
|
||||||
|
|
||||||
{ok, _} = emqx_authz:update({replace_once, Id3}, ?RULE4),
|
{ok, _} = emqx_authz:update({replace_once, Id3}, ?RULE4),
|
||||||
Lists2 = emqx_authz:check_rules([?RULE1, ?RULE2, ?RULE4]),
|
Lists2 = emqx_authz:check_rules([?RULE1, ?RULE2, ?RULE4]),
|
||||||
?assertMatch(Lists2, emqx_config:get([authorization, rules], [])),
|
?assertMatch(Lists2, emqx:get_config([authorization, rules], [])),
|
||||||
|
|
||||||
[#{annotations := #{id := Id1,
|
[#{annotations := #{id := Id1,
|
||||||
principal := all,
|
principal := all,
|
||||||
|
|
|
@ -39,7 +39,7 @@ start_link() ->
|
||||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
BridgesConf = emqx_config:get([?APP, bridges], []),
|
BridgesConf = emqx:get_config([?APP, bridges], []),
|
||||||
BridgeSpec = lists:map(fun bridge_spec/1, BridgesConf),
|
BridgeSpec = lists:map(fun bridge_spec/1, BridgesConf),
|
||||||
SupFlag = #{strategy => one_for_one,
|
SupFlag = #{strategy => one_for_one,
|
||||||
intensity => 100,
|
intensity => 100,
|
||||||
|
|
|
@ -98,7 +98,7 @@ stop_listener({Proto, Port, _}) ->
|
||||||
listeners() ->
|
listeners() ->
|
||||||
[{Protocol, Port, maps:to_list(maps:without([protocol, port], Map))}
|
[{Protocol, Port, maps:to_list(maps:without([protocol, port], Map))}
|
||||||
|| Map = #{protocol := Protocol,port := Port}
|
|| Map = #{protocol := Protocol,port := Port}
|
||||||
<- emqx_config:get([emqx_dashboard, listeners], [])].
|
<- emqx:get_config([emqx_dashboard, listeners], [])].
|
||||||
|
|
||||||
listener_name(Proto) ->
|
listener_name(Proto) ->
|
||||||
list_to_atom(atom_to_list(Proto) ++ ":dashboard").
|
list_to_atom(atom_to_list(Proto) ++ ":dashboard").
|
||||||
|
|
|
@ -201,7 +201,7 @@ add_default_user() ->
|
||||||
add_default_user(binenv(default_username), binenv(default_password)).
|
add_default_user(binenv(default_username), binenv(default_password)).
|
||||||
|
|
||||||
binenv(Key) ->
|
binenv(Key) ->
|
||||||
iolist_to_binary(emqx_config:get([emqx_dashboard, Key], "")).
|
iolist_to_binary(emqx:get_config([emqx_dashboard, Key], "")).
|
||||||
|
|
||||||
add_default_user(Username, Password) when ?EMPTY_KEY(Username) orelse ?EMPTY_KEY(Password) ->
|
add_default_user(Username, Password) when ?EMPTY_KEY(Username) orelse ?EMPTY_KEY(Password) ->
|
||||||
igonre;
|
igonre;
|
||||||
|
|
|
@ -58,7 +58,7 @@ get_collect() -> gen_server:call(whereis(?MODULE), get_collect).
|
||||||
init([]) ->
|
init([]) ->
|
||||||
timer(next_interval(), collect),
|
timer(next_interval(), collect),
|
||||||
timer(get_today_remaining_seconds(), clear_expire_data),
|
timer(get_today_remaining_seconds(), clear_expire_data),
|
||||||
ExpireInterval = emqx_config:get([emqx_dashboard, monitor, interval], ?EXPIRE_INTERVAL),
|
ExpireInterval = emqx:get_config([emqx_dashboard, monitor, interval], ?EXPIRE_INTERVAL),
|
||||||
State = #{
|
State = #{
|
||||||
count => count(),
|
count => count(),
|
||||||
expire_interval => ExpireInterval,
|
expire_interval => ExpireInterval,
|
||||||
|
@ -78,7 +78,7 @@ next_interval() ->
|
||||||
(1000 * interval()) - (erlang:system_time(millisecond) rem (1000 * interval())) - 1.
|
(1000 * interval()) - (erlang:system_time(millisecond) rem (1000 * interval())) - 1.
|
||||||
|
|
||||||
interval() ->
|
interval() ->
|
||||||
emqx_config:get([?APP, sample_interval], ?DEFAULT_INTERVAL).
|
emqx:get_config([?APP, sample_interval], ?DEFAULT_INTERVAL).
|
||||||
|
|
||||||
count() ->
|
count() ->
|
||||||
60 div interval().
|
60 div interval().
|
||||||
|
|
|
@ -148,7 +148,7 @@ jwk(Username, Password, Salt) ->
|
||||||
}.
|
}.
|
||||||
|
|
||||||
jwt_expiration_time() ->
|
jwt_expiration_time() ->
|
||||||
ExpTime = emqx_config:get([emqx_dashboard, token_expired_time], ?EXPTIME),
|
ExpTime = emqx:get_config([emqx_dashboard, token_expired_time], ?EXPTIME),
|
||||||
erlang:system_time(millisecond) + ExpTime.
|
erlang:system_time(millisecond) + ExpTime.
|
||||||
|
|
||||||
salt() ->
|
salt() ->
|
||||||
|
|
|
@ -27,7 +27,7 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
load_bridges() ->
|
load_bridges() ->
|
||||||
Bridges = emqx_config:get([emqx_data_bridge, bridges], []),
|
Bridges = emqx:get_config([emqx_data_bridge, bridges], []),
|
||||||
emqx_data_bridge_monitor:ensure_all_started(Bridges).
|
emqx_data_bridge_monitor:ensure_all_started(Bridges).
|
||||||
|
|
||||||
resource_type(mysql) -> emqx_connector_mysql;
|
resource_type(mysql) -> emqx_connector_mysql;
|
||||||
|
|
|
@ -58,7 +58,7 @@ request_options() ->
|
||||||
}.
|
}.
|
||||||
|
|
||||||
env(Key, Def) ->
|
env(Key, Def) ->
|
||||||
emqx_config:get([exhook, Key], Def).
|
emqx:get_config([exhook, Key], Def).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% APIs
|
%% APIs
|
||||||
|
|
|
@ -79,4 +79,4 @@ load_gateway_by_default([{Type, Confs}|More]) ->
|
||||||
load_gateway_by_default(More).
|
load_gateway_by_default(More).
|
||||||
|
|
||||||
confs() ->
|
confs() ->
|
||||||
maps:to_list(emqx_config:get([gateway], [])).
|
maps:to_list(emqx:get_config([gateway], [])).
|
||||||
|
|
|
@ -590,7 +590,7 @@ check_row_limit([Tab|Tables], Limit) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
max_row_limit() ->
|
max_row_limit() ->
|
||||||
emqx_config:get([?APP, max_row_limit], ?MAX_ROW_LIMIT).
|
emqx:get_config([?APP, max_row_limit], ?MAX_ROW_LIMIT).
|
||||||
|
|
||||||
table_size(Tab) -> ets:info(Tab, size).
|
table_size(Tab) -> ets:info(Tab, size).
|
||||||
|
|
||||||
|
|
|
@ -56,7 +56,7 @@ api_spec() ->
|
||||||
|
|
||||||
config_apis() ->
|
config_apis() ->
|
||||||
[config_api(ConfPath, Schema) || {ConfPath, Schema} <-
|
[config_api(ConfPath, Schema) || {ConfPath, Schema} <-
|
||||||
get_conf_schema(emqx_config:get([]), ?MAX_DEPTH), is_core_conf(ConfPath)].
|
get_conf_schema(emqx:get_config([]), ?MAX_DEPTH), is_core_conf(ConfPath)].
|
||||||
|
|
||||||
config_api(ConfPath, Schema) ->
|
config_api(ConfPath, Schema) ->
|
||||||
Path = path_join(ConfPath),
|
Path = path_join(ConfPath),
|
||||||
|
@ -131,7 +131,7 @@ config_reset(post, Req) ->
|
||||||
|
|
||||||
get_full_config() ->
|
get_full_config() ->
|
||||||
emqx_map_lib:jsonable_map(
|
emqx_map_lib:jsonable_map(
|
||||||
emqx_config:fill_defaults(emqx_config:get_raw([]))).
|
emqx_config:fill_defaults(emqx:get_raw_config([]))).
|
||||||
|
|
||||||
conf_path_from_querystr(Req) ->
|
conf_path_from_querystr(Req) ->
|
||||||
case proplists:get_value(<<"conf_path">>, cowboy_req:parse_qs(Req)) of
|
case proplists:get_value(<<"conf_path">>, cowboy_req:parse_qs(Req)) of
|
||||||
|
|
|
@ -68,7 +68,7 @@ mnesia(copy) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-spec(add_default_app() -> list()).
|
-spec(add_default_app() -> list()).
|
||||||
add_default_app() ->
|
add_default_app() ->
|
||||||
Apps = emqx_config:get([?APP, applications], []),
|
Apps = emqx:get_config([?APP, applications], []),
|
||||||
[ begin
|
[ begin
|
||||||
case {AppId, AppSecret} of
|
case {AppId, AppSecret} of
|
||||||
{undefined, _} -> ok;
|
{undefined, _} -> ok;
|
||||||
|
|
|
@ -94,7 +94,7 @@ stop_listener({Proto, Port, _}) ->
|
||||||
listeners() ->
|
listeners() ->
|
||||||
[{Protocol, Port, maps:to_list(maps:without([protocol, port], Map))}
|
[{Protocol, Port, maps:to_list(maps:without([protocol, port], Map))}
|
||||||
|| Map = #{protocol := Protocol,port := Port}
|
|| Map = #{protocol := Protocol,port := Port}
|
||||||
<- emqx_config:get([emqx_management, listeners], [])].
|
<- emqx:get_config([emqx_management, listeners], [])].
|
||||||
|
|
||||||
listener_name(Proto) ->
|
listener_name(Proto) ->
|
||||||
list_to_atom(atom_to_list(Proto) ++ ":management").
|
list_to_atom(atom_to_list(Proto) ++ ":management").
|
||||||
|
|
|
@ -104,7 +104,7 @@ on_message_publish(Msg) ->
|
||||||
|
|
||||||
-spec(start_link() -> emqx_types:startlink_ret()).
|
-spec(start_link() -> emqx_types:startlink_ret()).
|
||||||
start_link() ->
|
start_link() ->
|
||||||
Opts = emqx_config:get([delayed], #{}),
|
Opts = emqx:get_config([delayed], #{}),
|
||||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [Opts], []).
|
gen_server:start_link({local, ?SERVER}, ?MODULE, [Opts], []).
|
||||||
|
|
||||||
-spec(store(#delayed_message{}) -> ok | {error, atom()}).
|
-spec(store(#delayed_message{}) -> ok | {error, atom()}).
|
||||||
|
|
|
@ -179,4 +179,4 @@ rpc_call(Node, Module, Fun, Args) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
get_status() ->
|
get_status() ->
|
||||||
emqx_config:get([delayed, enable], true).
|
emqx:get_config([delayed, enable], true).
|
||||||
|
|
|
@ -38,7 +38,7 @@
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
enable() ->
|
enable() ->
|
||||||
Topics = emqx_config:get([event_message, topics], []),
|
Topics = emqx:get_config([event_message, topics], []),
|
||||||
lists:foreach(fun(Topic) ->
|
lists:foreach(fun(Topic) ->
|
||||||
case Topic of
|
case Topic of
|
||||||
<<"$event/client_connected">> ->
|
<<"$event/client_connected">> ->
|
||||||
|
@ -61,7 +61,7 @@ enable() ->
|
||||||
end, Topics).
|
end, Topics).
|
||||||
|
|
||||||
disable() ->
|
disable() ->
|
||||||
Topics = emqx_config:get([event_message, topics], []),
|
Topics = emqx:get_config([event_message, topics], []),
|
||||||
lists:foreach(fun(Topic) ->
|
lists:foreach(fun(Topic) ->
|
||||||
case Topic of
|
case Topic of
|
||||||
<<"$event/client_connected">> ->
|
<<"$event/client_connected">> ->
|
||||||
|
|
|
@ -32,17 +32,17 @@ stop(_State) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
maybe_enable_modules() ->
|
maybe_enable_modules() ->
|
||||||
emqx_config:get([delayed, enable], true) andalso emqx_delayed:enable(),
|
emqx:get_config([delayed, enable], true) andalso emqx_delayed:enable(),
|
||||||
emqx_config:get([telemetry, enable], true) andalso emqx_telemetry:enable(),
|
emqx:get_config([telemetry, enable], true) andalso emqx_telemetry:enable(),
|
||||||
emqx_config:get([recon, enable], true) andalso emqx_recon:enable(),
|
emqx:get_config([recon, enable], true) andalso emqx_recon:enable(),
|
||||||
emqx_event_message:enable(),
|
emqx_event_message:enable(),
|
||||||
emqx_rewrite:enable(),
|
emqx_rewrite:enable(),
|
||||||
emqx_topic_metrics:enable().
|
emqx_topic_metrics:enable().
|
||||||
|
|
||||||
maybe_disable_modules() ->
|
maybe_disable_modules() ->
|
||||||
emqx_config:get([delayed, enable], true) andalso emqx_delayed:disable(),
|
emqx:get_config([delayed, enable], true) andalso emqx_delayed:disable(),
|
||||||
emqx_config:get([telemetry, enable], true) andalso emqx_telemetry:disable(),
|
emqx:get_config([telemetry, enable], true) andalso emqx_telemetry:disable(),
|
||||||
emqx_config:get([recon, enable], true) andalso emqx_recon:disable(),
|
emqx:get_config([recon, enable], true) andalso emqx_recon:disable(),
|
||||||
emqx_event_message:disable(),
|
emqx_event_message:disable(),
|
||||||
emqx_rewrite:disable(),
|
emqx_rewrite:disable(),
|
||||||
emqx_topic_metrics:disable().
|
emqx_topic_metrics:disable().
|
||||||
|
|
|
@ -43,7 +43,7 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
enable() ->
|
enable() ->
|
||||||
Rules = emqx_config:get([rewrite, rules], []),
|
Rules = emqx:get_config([rewrite, rules], []),
|
||||||
register_hook(Rules).
|
register_hook(Rules).
|
||||||
|
|
||||||
disable() ->
|
disable() ->
|
||||||
|
@ -52,10 +52,10 @@ disable() ->
|
||||||
emqx_hooks:del('message.publish', {?MODULE, rewrite_publish}).
|
emqx_hooks:del('message.publish', {?MODULE, rewrite_publish}).
|
||||||
|
|
||||||
list() ->
|
list() ->
|
||||||
maps:get(<<"rules">>, emqx_config:get_raw([<<"rewrite">>], #{}), []).
|
maps:get(<<"rules">>, emqx:get_raw_config([<<"rewrite">>], #{}), []).
|
||||||
|
|
||||||
update(Rules0) ->
|
update(Rules0) ->
|
||||||
Rewrite = emqx_config:get_raw([<<"rewrite">>], #{}),
|
Rewrite = emqx:get_raw_config([<<"rewrite">>], #{}),
|
||||||
{ok, #{config := Config}} = emqx:update_config([rewrite], maps:put(<<"rules">>,
|
{ok, #{config := Config}} = emqx:update_config([rewrite], maps:put(<<"rules">>,
|
||||||
Rules0, Rewrite)),
|
Rules0, Rewrite)),
|
||||||
Rules = maps:get(rules, maps:get(rewrite, Config, #{}), []),
|
Rules = maps:get(rules, maps:get(rewrite, Config, #{}), []),
|
||||||
|
|
|
@ -107,7 +107,7 @@ mnesia(copy) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
start_link() ->
|
start_link() ->
|
||||||
Opts = emqx_config:get([telemetry], #{}),
|
Opts = emqx:get_config([telemetry], #{}),
|
||||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
|
||||||
|
|
||||||
stop() ->
|
stop() ->
|
||||||
|
@ -120,7 +120,7 @@ disable() ->
|
||||||
gen_server:call(?MODULE, disable).
|
gen_server:call(?MODULE, disable).
|
||||||
|
|
||||||
get_status() ->
|
get_status() ->
|
||||||
emqx_config:get([telemetry, enable], true).
|
emqx:get_config([telemetry, enable], true).
|
||||||
|
|
||||||
get_uuid() ->
|
get_uuid() ->
|
||||||
gen_server:call(?MODULE, get_uuid).
|
gen_server:call(?MODULE, get_uuid).
|
||||||
|
|
|
@ -137,7 +137,7 @@ on_message_dropped(#message{topic = Topic}, _, _) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
start_link() ->
|
start_link() ->
|
||||||
Opts = emqx_config:get([topic_metrics], #{}),
|
Opts = emqx:get_config([topic_metrics], #{}),
|
||||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
|
||||||
|
|
||||||
stop() ->
|
stop() ->
|
||||||
|
|
|
@ -45,7 +45,7 @@
|
||||||
-spec save_files_return_opts(opts_input(), atom() | string() | binary(),
|
-spec save_files_return_opts(opts_input(), atom() | string() | binary(),
|
||||||
string() | binary()) -> opts().
|
string() | binary()) -> opts().
|
||||||
save_files_return_opts(Options, SubDir, ResId) ->
|
save_files_return_opts(Options, SubDir, ResId) ->
|
||||||
Dir = filename:join([emqx_config:get([node, data_dir]), SubDir, ResId]),
|
Dir = filename:join([emqx:get_config([node, data_dir]), SubDir, ResId]),
|
||||||
save_files_return_opts(Options, Dir).
|
save_files_return_opts(Options, Dir).
|
||||||
|
|
||||||
%% @doc Parse ssl options input.
|
%% @doc Parse ssl options input.
|
||||||
|
@ -76,7 +76,7 @@ save_files_return_opts(Options, Dir) ->
|
||||||
%% empty string is returned if the input is empty.
|
%% empty string is returned if the input is empty.
|
||||||
-spec save_file(file_input(), atom() | string() | binary()) -> string().
|
-spec save_file(file_input(), atom() | string() | binary()) -> string().
|
||||||
save_file(Param, SubDir) ->
|
save_file(Param, SubDir) ->
|
||||||
Dir = filename:join([emqx_config:get([node, data_dir]), SubDir]),
|
Dir = filename:join([emqx:get_config([node, data_dir]), SubDir]),
|
||||||
do_save_file(Param, Dir).
|
do_save_file(Param, Dir).
|
||||||
|
|
||||||
filter([]) -> [];
|
filter([]) -> [];
|
||||||
|
|
|
@ -106,7 +106,7 @@ prometheus_api() ->
|
||||||
% {"/prometheus/stats", Metadata, stats}.
|
% {"/prometheus/stats", Metadata, stats}.
|
||||||
|
|
||||||
prometheus(get, _Request) ->
|
prometheus(get, _Request) ->
|
||||||
Response = emqx_config:get_raw([<<"prometheus">>], #{}),
|
Response = emqx:get_raw_config([<<"prometheus">>], #{}),
|
||||||
{200, Response};
|
{200, Response};
|
||||||
|
|
||||||
prometheus(put, Request) ->
|
prometheus(put, Request) ->
|
||||||
|
@ -128,11 +128,11 @@ prometheus(put, Request) ->
|
||||||
|
|
||||||
enable_prometheus(true) ->
|
enable_prometheus(true) ->
|
||||||
ok = emqx_prometheus_sup:stop_child(?APP),
|
ok = emqx_prometheus_sup:stop_child(?APP),
|
||||||
emqx_prometheus_sup:start_child(?APP, emqx_config:get([prometheus], #{})),
|
emqx_prometheus_sup:start_child(?APP, emqx:get_config([prometheus], #{})),
|
||||||
{200};
|
{200};
|
||||||
enable_prometheus(false) ->
|
enable_prometheus(false) ->
|
||||||
_ = emqx_prometheus_sup:stop_child(?APP),
|
_ = emqx_prometheus_sup:stop_child(?APP),
|
||||||
{200}.
|
{200}.
|
||||||
|
|
||||||
get_raw(Key, Def) ->
|
get_raw(Key, Def) ->
|
||||||
emqx_config:get_raw([<<"prometheus">>] ++ [Key], Def).
|
emqx:get_raw_config([<<"prometheus">>] ++ [Key], Def).
|
||||||
|
|
|
@ -34,9 +34,9 @@ stop(_State) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
maybe_enable_prometheus() ->
|
maybe_enable_prometheus() ->
|
||||||
case emqx_config:get([prometheus, enable], false) of
|
case emqx:get_config([prometheus, enable], false) of
|
||||||
true ->
|
true ->
|
||||||
emqx_prometheus_sup:start_child(?APP, emqx_config:get([prometheus], #{}));
|
emqx_prometheus_sup:start_child(?APP, emqx:get_config([prometheus], #{}));
|
||||||
false ->
|
false ->
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -129,7 +129,7 @@ deliver(Result, #{context_id := Id} = Context, Pid, Topic, Cursor) ->
|
||||||
false ->
|
false ->
|
||||||
ok;
|
ok;
|
||||||
_ ->
|
_ ->
|
||||||
#{msg_deliver_quota := MaxDeliverNum} = emqx_config:get([?APP, flow_control]),
|
#{msg_deliver_quota := MaxDeliverNum} = emqx:get_config([?APP, flow_control]),
|
||||||
case MaxDeliverNum of
|
case MaxDeliverNum of
|
||||||
0 ->
|
0 ->
|
||||||
_ = [Pid ! {deliver, Topic, Msg} || Msg <- Result],
|
_ = [Pid ! {deliver, Topic, Msg} || Msg <- Result],
|
||||||
|
@ -150,7 +150,7 @@ get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' :
|
||||||
timestamp = Ts}) ->
|
timestamp = Ts}) ->
|
||||||
Ts + Interval * 1000;
|
Ts + Interval * 1000;
|
||||||
get_expiry_time(#message{timestamp = Ts}) ->
|
get_expiry_time(#message{timestamp = Ts}) ->
|
||||||
Interval = emqx_config:get([?APP, msg_expiry_interval], ?DEF_EXPIRY_INTERVAL),
|
Interval = emqx:get_config([?APP, msg_expiry_interval], ?DEF_EXPIRY_INTERVAL),
|
||||||
case Interval of
|
case Interval of
|
||||||
0 -> 0;
|
0 -> 0;
|
||||||
_ -> Ts + Interval
|
_ -> Ts + Interval
|
||||||
|
@ -173,7 +173,7 @@ delete(Topic) ->
|
||||||
init([]) ->
|
init([]) ->
|
||||||
init_shared_context(),
|
init_shared_context(),
|
||||||
State = new_state(),
|
State = new_state(),
|
||||||
#{enable := Enable} = Cfg = emqx_config:get([?APP]),
|
#{enable := Enable} = Cfg = emqx:get_config([?APP]),
|
||||||
{ok,
|
{ok,
|
||||||
case Enable of
|
case Enable of
|
||||||
true ->
|
true ->
|
||||||
|
@ -209,7 +209,7 @@ handle_cast(Msg, State) ->
|
||||||
handle_info(clear_expired, #{context := Context} = State) ->
|
handle_info(clear_expired, #{context := Context} = State) ->
|
||||||
Mod = get_backend_module(),
|
Mod = get_backend_module(),
|
||||||
Mod:clear_expired(Context),
|
Mod:clear_expired(Context),
|
||||||
Interval = emqx_config:get([?APP, msg_clear_interval], ?DEF_EXPIRY_INTERVAL),
|
Interval = emqx:get_config([?APP, msg_clear_interval], ?DEF_EXPIRY_INTERVAL),
|
||||||
{noreply, State#{clear_timer := add_timer(Interval, clear_expired)}, hibernate};
|
{noreply, State#{clear_timer := add_timer(Interval, clear_expired)}, hibernate};
|
||||||
|
|
||||||
handle_info(release_deliver_quota, #{context := Context, wait_quotas := Waits} = State) ->
|
handle_info(release_deliver_quota, #{context := Context, wait_quotas := Waits} = State) ->
|
||||||
|
@ -225,7 +225,7 @@ handle_info(release_deliver_quota, #{context := Context, wait_quotas := Waits} =
|
||||||
end,
|
end,
|
||||||
Waits2)
|
Waits2)
|
||||||
end,
|
end,
|
||||||
Interval = emqx_config:get([?APP, flow_control, quota_release_interval]),
|
Interval = emqx:get_config([?APP, flow_control, quota_release_interval]),
|
||||||
{noreply, State#{release_quota_timer := add_timer(Interval, release_deliver_quota),
|
{noreply, State#{release_quota_timer := add_timer(Interval, release_deliver_quota),
|
||||||
wait_quotas := []}};
|
wait_quotas := []}};
|
||||||
|
|
||||||
|
@ -258,7 +258,7 @@ new_context(Id) ->
|
||||||
#{context_id => Id}.
|
#{context_id => Id}.
|
||||||
|
|
||||||
is_too_big(Size) ->
|
is_too_big(Size) ->
|
||||||
Limit = emqx_config:get([?APP, max_payload_size], ?DEF_MAX_PAYLOAD_SIZE),
|
Limit = emqx:get_config([?APP, max_payload_size], ?DEF_MAX_PAYLOAD_SIZE),
|
||||||
Limit > 0 andalso (Size > Limit).
|
Limit > 0 andalso (Size > Limit).
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
|
@ -332,7 +332,7 @@ insert_shared_context(Key, Term) ->
|
||||||
|
|
||||||
-spec get_msg_deliver_quota() -> non_neg_integer().
|
-spec get_msg_deliver_quota() -> non_neg_integer().
|
||||||
get_msg_deliver_quota() ->
|
get_msg_deliver_quota() ->
|
||||||
emqx_config:get([?APP, flow_control, msg_deliver_quota]).
|
emqx:get_config([?APP, flow_control, msg_deliver_quota]).
|
||||||
|
|
||||||
-spec update_config(state(), hocons:config()) -> state().
|
-spec update_config(state(), hocons:config()) -> state().
|
||||||
update_config(#{clear_timer := ClearTimer,
|
update_config(#{clear_timer := ClearTimer,
|
||||||
|
@ -342,7 +342,7 @@ update_config(#{clear_timer := ClearTimer,
|
||||||
flow_control := #{quota_release_interval := QuotaInterval},
|
flow_control := #{quota_release_interval := QuotaInterval},
|
||||||
msg_clear_interval := ClearInterval} = Conf,
|
msg_clear_interval := ClearInterval} = Conf,
|
||||||
|
|
||||||
#{config := OldConfig} = emqx_config:get([?APP]),
|
#{config := OldConfig} = emqx:get_config([?APP]),
|
||||||
|
|
||||||
case Enable of
|
case Enable of
|
||||||
true ->
|
true ->
|
||||||
|
@ -416,7 +416,7 @@ check_timer(Timer, _, _) ->
|
||||||
|
|
||||||
-spec get_backend_module() -> backend().
|
-spec get_backend_module() -> backend().
|
||||||
get_backend_module() ->
|
get_backend_module() ->
|
||||||
#{type := Backend} = emqx_config:get([?APP, config]),
|
#{type := Backend} = emqx:get_config([?APP, config]),
|
||||||
ModName = if Backend =:= built_in_database ->
|
ModName = if Backend =:= built_in_database ->
|
||||||
mnesia;
|
mnesia;
|
||||||
true ->
|
true ->
|
||||||
|
|
|
@ -35,7 +35,7 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
lookup_config(_Bindings, _Params) ->
|
lookup_config(_Bindings, _Params) ->
|
||||||
Config = emqx_config:get([emqx_retainer]),
|
Config = emqx:get_config([emqx_retainer]),
|
||||||
return({ok, Config}).
|
return({ok, Config}).
|
||||||
|
|
||||||
update_config(_Bindings, Params) ->
|
update_config(_Bindings, Params) ->
|
||||||
|
|
|
@ -130,7 +130,7 @@ read_message(_, Topic) ->
|
||||||
{ok, read_messages(Topic)}.
|
{ok, read_messages(Topic)}.
|
||||||
|
|
||||||
match_messages(_, Topic, Cursor) ->
|
match_messages(_, Topic, Cursor) ->
|
||||||
MaxReadNum = emqx_config:get([?APP, flow_control, max_read_number]),
|
MaxReadNum = emqx:get_config([?APP, flow_control, max_read_number]),
|
||||||
case Cursor of
|
case Cursor of
|
||||||
undefined ->
|
undefined ->
|
||||||
case MaxReadNum of
|
case MaxReadNum of
|
||||||
|
@ -227,7 +227,7 @@ make_match_spec(Filter) ->
|
||||||
|
|
||||||
-spec is_table_full() -> boolean().
|
-spec is_table_full() -> boolean().
|
||||||
is_table_full() ->
|
is_table_full() ->
|
||||||
#{max_retained_messages := Limit} = emqx_config:get([?APP, config]),
|
#{max_retained_messages := Limit} = emqx:get_config([?APP, config]),
|
||||||
Limit > 0 andalso (table_size() >= Limit).
|
Limit > 0 andalso (table_size() >= Limit).
|
||||||
|
|
||||||
-spec table_size() -> non_neg_integer().
|
-spec table_size() -> non_neg_integer().
|
||||||
|
|
|
@ -506,7 +506,7 @@ connect(Options) when is_list(Options) ->
|
||||||
connect(Options = #{disk_cache := DiskCache, ecpool_worker_id := Id, pool_name := Pool}) ->
|
connect(Options = #{disk_cache := DiskCache, ecpool_worker_id := Id, pool_name := Pool}) ->
|
||||||
Options0 = case DiskCache of
|
Options0 = case DiskCache of
|
||||||
true ->
|
true ->
|
||||||
DataDir = filename:join([emqx_config:get([node, data_dir]), replayq, Pool, integer_to_list(Id)]),
|
DataDir = filename:join([emqx:get_config([node, data_dir]), replayq, Pool, integer_to_list(Id)]),
|
||||||
QueueOption = #{replayq_dir => DataDir},
|
QueueOption = #{replayq_dir => DataDir},
|
||||||
Options#{queue => QueueOption};
|
Options#{queue => QueueOption};
|
||||||
false ->
|
false ->
|
||||||
|
|
|
@ -595,4 +595,4 @@ printable_maps(Headers) ->
|
||||||
|
|
||||||
ignore_sys_message(#message{flags = Flags}) ->
|
ignore_sys_message(#message{flags = Flags}) ->
|
||||||
maps:get(sys, Flags, false) andalso
|
maps:get(sys, Flags, false) andalso
|
||||||
emqx_config:get([emqx_rule_engine, ignore_sys_message]).
|
emqx:get_config([emqx_rule_engine, ignore_sys_message]).
|
||||||
|
|
|
@ -84,7 +84,7 @@ statsd_api() ->
|
||||||
[{"/statsd", Metadata, statsd}].
|
[{"/statsd", Metadata, statsd}].
|
||||||
|
|
||||||
statsd(get, _Request) ->
|
statsd(get, _Request) ->
|
||||||
Response = emqx_config:get_raw([<<"statsd">>], #{}),
|
Response = emqx:get_raw_config([<<"statsd">>], #{}),
|
||||||
{200, Response};
|
{200, Response};
|
||||||
|
|
||||||
statsd(put, Request) ->
|
statsd(put, Request) ->
|
||||||
|
@ -96,11 +96,11 @@ statsd(put, Request) ->
|
||||||
|
|
||||||
enable_statsd(true) ->
|
enable_statsd(true) ->
|
||||||
ok = emqx_statsd_sup:stop_child(?APP),
|
ok = emqx_statsd_sup:stop_child(?APP),
|
||||||
emqx_statsd_sup:start_child(?APP, emqx_config:get([statsd], #{})),
|
emqx_statsd_sup:start_child(?APP, emqx:get_config([statsd], #{})),
|
||||||
{200};
|
{200};
|
||||||
enable_statsd(false) ->
|
enable_statsd(false) ->
|
||||||
_ = emqx_statsd_sup:stop_child(?APP),
|
_ = emqx_statsd_sup:stop_child(?APP),
|
||||||
{200}.
|
{200}.
|
||||||
|
|
||||||
get_raw(Key, Def) ->
|
get_raw(Key, Def) ->
|
||||||
emqx_config:get_raw([<<"statsd">>]++ [Key], Def).
|
emqx:get_raw_config([<<"statsd">>]++ [Key], Def).
|
||||||
|
|
|
@ -32,9 +32,9 @@ stop(_) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
maybe_enable_statsd() ->
|
maybe_enable_statsd() ->
|
||||||
case emqx_config:get([statsd, enable], false) of
|
case emqx:get_config([statsd, enable], false) of
|
||||||
true ->
|
true ->
|
||||||
emqx_statsd_sup:start_child(?APP, emqx_config:get([statsd], #{}));
|
emqx_statsd_sup:start_child(?APP, emqx:get_config([statsd], #{}));
|
||||||
false ->
|
false ->
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
Loading…
Reference in New Issue