Merge pull request #10698 from qzhuyan/perf/william/force-atom-conf-path
perf(config): enforcing atom key path in hotcode path
This commit is contained in:
commit
e824e1db39
|
@ -184,11 +184,18 @@ run_fold_hook(HookPoint, Args, Acc) ->
|
||||||
|
|
||||||
-spec get_config(emqx_utils_maps:config_key_path()) -> term().
|
-spec get_config(emqx_utils_maps:config_key_path()) -> term().
|
||||||
get_config(KeyPath) ->
|
get_config(KeyPath) ->
|
||||||
emqx_config:get(KeyPath).
|
KeyPath1 = emqx_config:ensure_atom_conf_path(KeyPath, {raise_error, config_not_found}),
|
||||||
|
emqx_config:get(KeyPath1).
|
||||||
|
|
||||||
-spec get_config(emqx_utils_maps:config_key_path(), term()) -> term().
|
-spec get_config(emqx_utils_maps:config_key_path(), term()) -> term().
|
||||||
get_config(KeyPath, Default) ->
|
get_config(KeyPath, Default) ->
|
||||||
emqx_config:get(KeyPath, Default).
|
try
|
||||||
|
KeyPath1 = emqx_config:ensure_atom_conf_path(KeyPath, {raise_error, config_not_found}),
|
||||||
|
emqx_config:get(KeyPath1, Default)
|
||||||
|
catch
|
||||||
|
error:config_not_found ->
|
||||||
|
Default
|
||||||
|
end.
|
||||||
|
|
||||||
-spec get_raw_config(emqx_utils_maps:config_key_path()) -> term().
|
-spec get_raw_config(emqx_utils_maps:config_key_path()) -> term().
|
||||||
get_raw_config(KeyPath) ->
|
get_raw_config(KeyPath) ->
|
||||||
|
|
|
@ -88,6 +88,8 @@
|
||||||
remove_handlers/0
|
remove_handlers/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export([ensure_atom_conf_path/2]).
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-export([erase_all/0]).
|
-export([erase_all/0]).
|
||||||
-endif.
|
-endif.
|
||||||
|
@ -113,7 +115,8 @@
|
||||||
update_cmd/0,
|
update_cmd/0,
|
||||||
update_args/0,
|
update_args/0,
|
||||||
update_error/0,
|
update_error/0,
|
||||||
update_result/0
|
update_result/0,
|
||||||
|
runtime_config_key_path/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-type update_request() :: term().
|
-type update_request() :: term().
|
||||||
|
@ -144,6 +147,8 @@
|
||||||
-type config() :: #{atom() => term()} | list() | undefined.
|
-type config() :: #{atom() => term()} | list() | undefined.
|
||||||
-type app_envs() :: [proplists:property()].
|
-type app_envs() :: [proplists:property()].
|
||||||
|
|
||||||
|
-type runtime_config_key_path() :: [atom()].
|
||||||
|
|
||||||
%% @doc For the given path, get root value enclosed in a single-key map.
|
%% @doc For the given path, get root value enclosed in a single-key map.
|
||||||
-spec get_root(emqx_utils_maps:config_key_path()) -> map().
|
-spec get_root(emqx_utils_maps:config_key_path()) -> map().
|
||||||
get_root([RootName | _]) ->
|
get_root([RootName | _]) ->
|
||||||
|
@ -156,25 +161,21 @@ get_root_raw([RootName | _]) ->
|
||||||
|
|
||||||
%% @doc Get a config value for the given path.
|
%% @doc Get a config value for the given path.
|
||||||
%% The path should at least include root config name.
|
%% The path should at least include root config name.
|
||||||
-spec get(emqx_utils_maps:config_key_path()) -> term().
|
-spec get(runtime_config_key_path()) -> term().
|
||||||
get(KeyPath) -> do_get(?CONF, KeyPath).
|
get(KeyPath) -> do_get(?CONF, KeyPath).
|
||||||
|
|
||||||
-spec get(emqx_utils_maps:config_key_path(), term()) -> term().
|
-spec get(runtime_config_key_path(), term()) -> term().
|
||||||
get(KeyPath, Default) -> do_get(?CONF, KeyPath, Default).
|
get(KeyPath, Default) -> do_get(?CONF, KeyPath, Default).
|
||||||
|
|
||||||
-spec find(emqx_utils_maps:config_key_path()) ->
|
-spec find(runtime_config_key_path()) ->
|
||||||
{ok, term()} | {not_found, emqx_utils_maps:config_key_path(), term()}.
|
{ok, term()} | {not_found, emqx_utils_maps:config_key_path(), term()}.
|
||||||
find([]) ->
|
find([]) ->
|
||||||
case do_get(?CONF, [], ?CONFIG_NOT_FOUND_MAGIC) of
|
case do_get(?CONF, [], ?CONFIG_NOT_FOUND_MAGIC) of
|
||||||
?CONFIG_NOT_FOUND_MAGIC -> {not_found, []};
|
?CONFIG_NOT_FOUND_MAGIC -> {not_found, []};
|
||||||
Res -> {ok, Res}
|
Res -> {ok, Res}
|
||||||
end;
|
end;
|
||||||
find(KeyPath) ->
|
find(AtomKeyPath) ->
|
||||||
atom_conf_path(
|
emqx_utils_maps:deep_find(AtomKeyPath, get_root(AtomKeyPath)).
|
||||||
KeyPath,
|
|
||||||
fun(AtomKeyPath) -> emqx_utils_maps:deep_find(AtomKeyPath, get_root(KeyPath)) end,
|
|
||||||
{return, {not_found, KeyPath}}
|
|
||||||
).
|
|
||||||
|
|
||||||
-spec find_raw(emqx_utils_maps:config_key_path()) ->
|
-spec find_raw(emqx_utils_maps:config_key_path()) ->
|
||||||
{ok, term()} | {not_found, emqx_utils_maps:config_key_path(), term()}.
|
{ok, term()} | {not_found, emqx_utils_maps:config_key_path(), term()}.
|
||||||
|
@ -712,21 +713,14 @@ do_put(Type, Putter, [RootName | KeyPath], DeepValue) ->
|
||||||
NewValue = do_deep_put(Type, Putter, KeyPath, OldValue, DeepValue),
|
NewValue = do_deep_put(Type, Putter, KeyPath, OldValue, DeepValue),
|
||||||
persistent_term:put(?PERSIS_KEY(Type, RootName), NewValue).
|
persistent_term:put(?PERSIS_KEY(Type, RootName), NewValue).
|
||||||
|
|
||||||
do_deep_get(?CONF, KeyPath, Map, Default) ->
|
do_deep_get(?CONF, AtomKeyPath, Map, Default) ->
|
||||||
atom_conf_path(
|
emqx_utils_maps:deep_get(AtomKeyPath, Map, Default);
|
||||||
KeyPath,
|
|
||||||
fun(AtomKeyPath) -> emqx_utils_maps:deep_get(AtomKeyPath, Map, Default) end,
|
|
||||||
{return, Default}
|
|
||||||
);
|
|
||||||
do_deep_get(?RAW_CONF, KeyPath, Map, Default) ->
|
do_deep_get(?RAW_CONF, KeyPath, Map, Default) ->
|
||||||
emqx_utils_maps:deep_get([bin(Key) || Key <- KeyPath], Map, Default).
|
emqx_utils_maps:deep_get([bin(Key) || Key <- KeyPath], Map, Default).
|
||||||
|
|
||||||
do_deep_put(?CONF, Putter, KeyPath, Map, Value) ->
|
do_deep_put(?CONF, Putter, KeyPath, Map, Value) ->
|
||||||
atom_conf_path(
|
AtomKeyPath = ensure_atom_conf_path(KeyPath, {raise_error, {not_found, KeyPath}}),
|
||||||
KeyPath,
|
Putter(AtomKeyPath, Map, Value);
|
||||||
fun(AtomKeyPath) -> Putter(AtomKeyPath, Map, Value) end,
|
|
||||||
{raise_error, {not_found, KeyPath}}
|
|
||||||
);
|
|
||||||
do_deep_put(?RAW_CONF, Putter, KeyPath, Map, Value) ->
|
do_deep_put(?RAW_CONF, Putter, KeyPath, Map, Value) ->
|
||||||
Putter([bin(Key) || Key <- KeyPath], Map, Value).
|
Putter([bin(Key) || Key <- KeyPath], Map, Value).
|
||||||
|
|
||||||
|
@ -773,15 +767,24 @@ conf_key(?CONF, RootName) ->
|
||||||
conf_key(?RAW_CONF, RootName) ->
|
conf_key(?RAW_CONF, RootName) ->
|
||||||
bin(RootName).
|
bin(RootName).
|
||||||
|
|
||||||
atom_conf_path(Path, ExpFun, OnFail) ->
|
ensure_atom_conf_path(Path, OnFail) ->
|
||||||
try [atom(Key) || Key <- Path] of
|
case lists:all(fun erlang:is_atom/1, Path) of
|
||||||
AtomKeyPath -> ExpFun(AtomKeyPath)
|
true ->
|
||||||
|
%% Do not try to build new atom PATH if it already is.
|
||||||
|
Path;
|
||||||
|
_ ->
|
||||||
|
to_atom_conf_path(Path, OnFail)
|
||||||
|
end.
|
||||||
|
|
||||||
|
to_atom_conf_path(Path, OnFail) ->
|
||||||
|
try
|
||||||
|
[atom(Key) || Key <- Path]
|
||||||
catch
|
catch
|
||||||
error:badarg ->
|
error:badarg ->
|
||||||
case OnFail of
|
case OnFail of
|
||||||
{return, Val} ->
|
|
||||||
Val;
|
|
||||||
{raise_error, Err} ->
|
{raise_error, Err} ->
|
||||||
error(Err)
|
error(Err);
|
||||||
|
{return, V} ->
|
||||||
|
V
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -158,9 +158,18 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
|
||||||
|
|
||||||
-spec strategy(emqx_topic:group()) -> strategy().
|
-spec strategy(emqx_topic:group()) -> strategy().
|
||||||
strategy(Group) ->
|
strategy(Group) ->
|
||||||
case emqx:get_config([broker, shared_subscription_group, Group, strategy], undefined) of
|
try
|
||||||
undefined -> emqx:get_config([broker, shared_subscription_strategy]);
|
emqx:get_config([
|
||||||
Strategy -> Strategy
|
broker,
|
||||||
|
shared_subscription_group,
|
||||||
|
binary_to_existing_atom(Group),
|
||||||
|
strategy
|
||||||
|
])
|
||||||
|
catch
|
||||||
|
error:{config_not_found, _} ->
|
||||||
|
get_default_shared_subscription_strategy();
|
||||||
|
error:badarg ->
|
||||||
|
get_default_shared_subscription_strategy()
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec ack_enabled() -> boolean().
|
-spec ack_enabled() -> boolean().
|
||||||
|
@ -544,3 +553,6 @@ delete_route_if_needed({Group, Topic} = GroupTopic) ->
|
||||||
if_no_more_subscribers(GroupTopic, fun() ->
|
if_no_more_subscribers(GroupTopic, fun() ->
|
||||||
ok = emqx_router:do_delete_route(Topic, {Group, node()})
|
ok = emqx_router:do_delete_route(Topic, {Group, node()})
|
||||||
end).
|
end).
|
||||||
|
|
||||||
|
get_default_shared_subscription_strategy() ->
|
||||||
|
emqx:get_config([broker, shared_subscription_strategy]).
|
||||||
|
|
|
@ -156,6 +156,19 @@ t_cluster_nodes(_) ->
|
||||||
?assertEqual(Expected, emqx:cluster_nodes(cores)),
|
?assertEqual(Expected, emqx:cluster_nodes(cores)),
|
||||||
?assertEqual([], emqx:cluster_nodes(stopped)).
|
?assertEqual([], emqx:cluster_nodes(stopped)).
|
||||||
|
|
||||||
|
t_get_config(_) ->
|
||||||
|
?assertEqual(false, emqx:get_config([overload_protection, enable])),
|
||||||
|
?assertEqual(false, emqx:get_config(["overload_protection", <<"enable">>])).
|
||||||
|
|
||||||
|
t_get_config_default_1(_) ->
|
||||||
|
?assertEqual(false, emqx:get_config([overload_protection, enable], undefined)),
|
||||||
|
?assertEqual(false, emqx:get_config(["overload_protection", <<"enable">>], undefined)).
|
||||||
|
|
||||||
|
t_get_config_default_2(_) ->
|
||||||
|
AtomPathRes = emqx:get_config([overload_protection, <<"_!no_@exist_">>], undefined),
|
||||||
|
NonAtomPathRes = emqx:get_config(["doesnotexist", <<"db_backend">>], undefined),
|
||||||
|
?assertEqual(undefined, NonAtomPathRes),
|
||||||
|
?assertEqual(undefined, AtomPathRes).
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Hook fun
|
%% Hook fun
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -805,7 +805,11 @@ with_listener(ListenerID, Fun) ->
|
||||||
find_listener(ListenerID) ->
|
find_listener(ListenerID) ->
|
||||||
case binary:split(ListenerID, <<":">>) of
|
case binary:split(ListenerID, <<":">>) of
|
||||||
[BType, BName] ->
|
[BType, BName] ->
|
||||||
case emqx_config:find([listeners, BType, BName]) of
|
case
|
||||||
|
emqx_config:find([
|
||||||
|
listeners, binary_to_existing_atom(BType), binary_to_existing_atom(BName)
|
||||||
|
])
|
||||||
|
of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
{ok, {BType, BName}};
|
{ok, {BType, BName}};
|
||||||
{not_found, _, _} ->
|
{not_found, _, _} ->
|
||||||
|
|
|
@ -687,11 +687,15 @@ get_metrics_from_local_node(BridgeType, BridgeName) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
is_enabled_bridge(BridgeType, BridgeName) ->
|
is_enabled_bridge(BridgeType, BridgeName) ->
|
||||||
try emqx:get_config([bridges, BridgeType, BridgeName]) of
|
try emqx:get_config([bridges, BridgeType, binary_to_existing_atom(BridgeName)]) of
|
||||||
ConfMap ->
|
ConfMap ->
|
||||||
maps:get(enable, ConfMap, false)
|
maps:get(enable, ConfMap, false)
|
||||||
catch
|
catch
|
||||||
error:{config_not_found, _} ->
|
error:{config_not_found, _} ->
|
||||||
|
throw(not_found);
|
||||||
|
error:badarg ->
|
||||||
|
%% catch non-existing atom,
|
||||||
|
%% none-existing atom means it is not available in config PT storage.
|
||||||
throw(not_found)
|
throw(not_found)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -38,8 +38,8 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
start_link() ->
|
start_link() ->
|
||||||
MaxHistory = emqx_conf:get(["node", "cluster_call", "max_history"], 100),
|
MaxHistory = emqx_conf:get([node, cluster_call, max_history], 100),
|
||||||
CleanupMs = emqx_conf:get(["node", "cluster_call", "cleanup_interval"], 5 * 60 * 1000),
|
CleanupMs = emqx_conf:get([node, cluster_call, cleanup_interval], 5 * 60 * 1000),
|
||||||
start_link(MaxHistory, CleanupMs).
|
start_link(MaxHistory, CleanupMs).
|
||||||
|
|
||||||
start_link(MaxHistory, CleanupMs) ->
|
start_link(MaxHistory, CleanupMs) ->
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Optimize memory usage when accessing the configuration during runtime.
|
|
@ -0,0 +1,5 @@
|
||||||
|
在运行时降低读取配置的内存占用。
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_ee_schema_registry, [
|
{application, emqx_ee_schema_registry, [
|
||||||
{description, "EMQX Schema Registry"},
|
{description, "EMQX Schema Registry"},
|
||||||
{vsn, "0.1.2"},
|
{vsn, "0.1.3"},
|
||||||
{registered, [emqx_ee_schema_registry_sup]},
|
{registered, [emqx_ee_schema_registry_sup]},
|
||||||
{mod, {emqx_ee_schema_registry_app, []}},
|
{mod, {emqx_ee_schema_registry_app, []}},
|
||||||
{applications, [
|
{applications, [
|
||||||
|
|
|
@ -58,7 +58,11 @@ get_serde(SchemaName) ->
|
||||||
|
|
||||||
-spec get_schema(schema_name()) -> {ok, map()} | {error, not_found}.
|
-spec get_schema(schema_name()) -> {ok, map()} | {error, not_found}.
|
||||||
get_schema(SchemaName) ->
|
get_schema(SchemaName) ->
|
||||||
case emqx_config:get([?CONF_KEY_ROOT, schemas, SchemaName], undefined) of
|
case
|
||||||
|
emqx_config:get(
|
||||||
|
[?CONF_KEY_ROOT, schemas, binary_to_atom(SchemaName)], undefined
|
||||||
|
)
|
||||||
|
of
|
||||||
undefined ->
|
undefined ->
|
||||||
{error, not_found};
|
{error, not_found};
|
||||||
Config ->
|
Config ->
|
||||||
|
|
Loading…
Reference in New Issue