chore: move reload_local_etc_config from emqx_conf to emqx_conf_cli
This commit is contained in:
parent
1ca1ba9e7a
commit
505d677685
|
@ -46,11 +46,10 @@
|
||||||
|
|
||||||
-export([schema/2]).
|
-export([schema/2]).
|
||||||
|
|
||||||
-define(MOD, module).
|
-define(MOD, '$mod').
|
||||||
-define(WKEY, '?').
|
-define(WKEY, '?').
|
||||||
|
|
||||||
-type handler_name() :: module().
|
-type handler_name() :: module().
|
||||||
-type handlers() :: #{emqx_config:config_key() => handlers(), ?MOD => handler_name()}.
|
|
||||||
|
|
||||||
-optional_callbacks([
|
-optional_callbacks([
|
||||||
pre_config_update/3,
|
pre_config_update/3,
|
||||||
|
@ -69,10 +68,7 @@
|
||||||
) ->
|
) ->
|
||||||
ok | {ok, Result :: any()} | {error, Reason :: term()}.
|
ok | {ok, Result :: any()} | {error, Reason :: term()}.
|
||||||
|
|
||||||
-type state() :: #{
|
-type state() :: #{handlers := any()}.
|
||||||
handlers := handlers(),
|
|
||||||
atom() => term()
|
|
||||||
}.
|
|
||||||
|
|
||||||
start_link() ->
|
start_link() ->
|
||||||
gen_server:start_link({local, ?MODULE}, ?MODULE, {}, []).
|
gen_server:start_link({local, ?MODULE}, ?MODULE, {}, []).
|
||||||
|
|
|
@ -382,7 +382,7 @@ fields("persistent_table_mria_opts") ->
|
||||||
];
|
];
|
||||||
fields("persistent_session_builtin") ->
|
fields("persistent_session_builtin") ->
|
||||||
[
|
[
|
||||||
{"type", sc(hoconsc:enum([builtin]), #{default => builtin, desc => ""})},
|
{"type", sc(hoconsc:enum([builtin]), #{default => <<"builtin">>, desc => ""})},
|
||||||
{"session",
|
{"session",
|
||||||
sc(ref("persistent_table_mria_opts"), #{
|
sc(ref("persistent_table_mria_opts"), #{
|
||||||
desc => ?DESC(persistent_session_builtin_session_table)
|
desc => ?DESC(persistent_session_builtin_session_table)
|
||||||
|
@ -548,7 +548,7 @@ fields("mqtt") ->
|
||||||
sc(
|
sc(
|
||||||
hoconsc:union([integer(), disabled]),
|
hoconsc:union([integer(), disabled]),
|
||||||
#{
|
#{
|
||||||
default => disabled,
|
default => <<"disabled">>,
|
||||||
desc => ?DESC(mqtt_server_keepalive)
|
desc => ?DESC(mqtt_server_keepalive)
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
|
@ -575,7 +575,7 @@ fields("mqtt") ->
|
||||||
sc(
|
sc(
|
||||||
hoconsc:union([range(1, inf), infinity]),
|
hoconsc:union([range(1, inf), infinity]),
|
||||||
#{
|
#{
|
||||||
default => infinity,
|
default => <<"infinity">>,
|
||||||
desc => ?DESC(mqtt_max_subscriptions)
|
desc => ?DESC(mqtt_max_subscriptions)
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
|
@ -639,7 +639,7 @@ fields("mqtt") ->
|
||||||
sc(
|
sc(
|
||||||
hoconsc:union([disabled, map()]),
|
hoconsc:union([disabled, map()]),
|
||||||
#{
|
#{
|
||||||
default => disabled,
|
default => <<"disabled">>,
|
||||||
desc => ?DESC(mqtt_mqueue_priorities)
|
desc => ?DESC(mqtt_mqueue_priorities)
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
|
@ -647,7 +647,7 @@ fields("mqtt") ->
|
||||||
sc(
|
sc(
|
||||||
hoconsc:enum([highest, lowest]),
|
hoconsc:enum([highest, lowest]),
|
||||||
#{
|
#{
|
||||||
default => lowest,
|
default => <<"lowest">>,
|
||||||
desc => ?DESC(mqtt_mqueue_default_priority)
|
desc => ?DESC(mqtt_mqueue_default_priority)
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
|
@ -671,7 +671,7 @@ fields("mqtt") ->
|
||||||
sc(
|
sc(
|
||||||
hoconsc:enum([disabled, cn, dn, crt, pem, md5]),
|
hoconsc:enum([disabled, cn, dn, crt, pem, md5]),
|
||||||
#{
|
#{
|
||||||
default => disabled,
|
default => <<"disabled">>,
|
||||||
desc => ?DESC(mqtt_peer_cert_as_username)
|
desc => ?DESC(mqtt_peer_cert_as_username)
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
|
@ -679,7 +679,7 @@ fields("mqtt") ->
|
||||||
sc(
|
sc(
|
||||||
hoconsc:enum([disabled, cn, dn, crt, pem, md5]),
|
hoconsc:enum([disabled, cn, dn, crt, pem, md5]),
|
||||||
#{
|
#{
|
||||||
default => disabled,
|
default => <<"disabled">>,
|
||||||
desc => ?DESC(mqtt_peer_cert_as_clientid)
|
desc => ?DESC(mqtt_peer_cert_as_clientid)
|
||||||
}
|
}
|
||||||
)}
|
)}
|
||||||
|
@ -1224,7 +1224,7 @@ fields("ws_opts") ->
|
||||||
sc(
|
sc(
|
||||||
hoconsc:enum([single, multiple]),
|
hoconsc:enum([single, multiple]),
|
||||||
#{
|
#{
|
||||||
default => multiple,
|
default => <<"multiple">>,
|
||||||
desc => ?DESC(fields_ws_opts_mqtt_piggyback)
|
desc => ?DESC(fields_ws_opts_mqtt_piggyback)
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
|
@ -1248,7 +1248,7 @@ fields("ws_opts") ->
|
||||||
sc(
|
sc(
|
||||||
hoconsc:union([infinity, integer()]),
|
hoconsc:union([infinity, integer()]),
|
||||||
#{
|
#{
|
||||||
default => infinity,
|
default => <<"infinity">>,
|
||||||
desc => ?DESC(fields_ws_opts_max_frame_size)
|
desc => ?DESC(fields_ws_opts_max_frame_size)
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
|
@ -1506,7 +1506,7 @@ fields("deflate_opts") ->
|
||||||
sc(
|
sc(
|
||||||
hoconsc:enum([default, filtered, huffman_only, rle]),
|
hoconsc:enum([default, filtered, huffman_only, rle]),
|
||||||
#{
|
#{
|
||||||
default => default,
|
default => <<"default">>,
|
||||||
desc => ?DESC(fields_deflate_opts_strategy)
|
desc => ?DESC(fields_deflate_opts_strategy)
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
|
@ -1514,7 +1514,7 @@ fields("deflate_opts") ->
|
||||||
sc(
|
sc(
|
||||||
hoconsc:enum([takeover, no_takeover]),
|
hoconsc:enum([takeover, no_takeover]),
|
||||||
#{
|
#{
|
||||||
default => takeover,
|
default => <<"takeover">>,
|
||||||
desc => ?DESC(fields_deflate_opts_server_context_takeover)
|
desc => ?DESC(fields_deflate_opts_server_context_takeover)
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
|
@ -1522,7 +1522,7 @@ fields("deflate_opts") ->
|
||||||
sc(
|
sc(
|
||||||
hoconsc:enum([takeover, no_takeover]),
|
hoconsc:enum([takeover, no_takeover]),
|
||||||
#{
|
#{
|
||||||
default => takeover,
|
default => <<"takeover">>,
|
||||||
desc => ?DESC(fields_deflate_opts_client_context_takeover)
|
desc => ?DESC(fields_deflate_opts_client_context_takeover)
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
|
@ -1557,7 +1557,7 @@ fields("broker") ->
|
||||||
sc(
|
sc(
|
||||||
hoconsc:enum([local, leader, quorum, all]),
|
hoconsc:enum([local, leader, quorum, all]),
|
||||||
#{
|
#{
|
||||||
default => quorum,
|
default => <<"quorum">>,
|
||||||
desc => ?DESC(broker_session_locking_strategy)
|
desc => ?DESC(broker_session_locking_strategy)
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
|
@ -1573,7 +1573,7 @@ fields("broker") ->
|
||||||
hash_clientid
|
hash_clientid
|
||||||
]),
|
]),
|
||||||
#{
|
#{
|
||||||
default => round_robin,
|
default => <<"round_robin">>,
|
||||||
desc => ?DESC(broker_shared_subscription_strategy)
|
desc => ?DESC(broker_shared_subscription_strategy)
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
|
@ -1626,7 +1626,7 @@ fields("shared_subscription_group") ->
|
||||||
hash_clientid
|
hash_clientid
|
||||||
]),
|
]),
|
||||||
#{
|
#{
|
||||||
default => random,
|
default => <<"random">>,
|
||||||
desc => ?DESC(shared_subscription_strategy_enum)
|
desc => ?DESC(shared_subscription_strategy_enum)
|
||||||
}
|
}
|
||||||
)}
|
)}
|
||||||
|
@ -1637,7 +1637,7 @@ fields("broker_perf") ->
|
||||||
sc(
|
sc(
|
||||||
hoconsc:enum([key, tab, global]),
|
hoconsc:enum([key, tab, global]),
|
||||||
#{
|
#{
|
||||||
default => key,
|
default => <<"key">>,
|
||||||
desc => ?DESC(broker_perf_route_lock_type)
|
desc => ?DESC(broker_perf_route_lock_type)
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
|
@ -1759,7 +1759,7 @@ fields("sysmon_vm") ->
|
||||||
sc(
|
sc(
|
||||||
hoconsc:union([disabled, duration()]),
|
hoconsc:union([disabled, duration()]),
|
||||||
#{
|
#{
|
||||||
default => disabled,
|
default => <<"disabled">>,
|
||||||
desc => ?DESC(sysmon_vm_long_gc)
|
desc => ?DESC(sysmon_vm_long_gc)
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
|
@ -1959,7 +1959,7 @@ fields("trace") ->
|
||||||
[
|
[
|
||||||
{"payload_encode",
|
{"payload_encode",
|
||||||
sc(hoconsc:enum([hex, text, hidden]), #{
|
sc(hoconsc:enum([hex, text, hidden]), #{
|
||||||
default => text,
|
default => <<"text">>,
|
||||||
deprecated => {since, "5.0.22"},
|
deprecated => {since, "5.0.22"},
|
||||||
importance => ?IMPORTANCE_HIDDEN,
|
importance => ?IMPORTANCE_HIDDEN,
|
||||||
desc => ?DESC(fields_trace_payload_encode)
|
desc => ?DESC(fields_trace_payload_encode)
|
||||||
|
@ -2048,7 +2048,7 @@ base_listener(Bind) ->
|
||||||
atom(),
|
atom(),
|
||||||
#{
|
#{
|
||||||
desc => ?DESC(base_listener_zone),
|
desc => ?DESC(base_listener_zone),
|
||||||
default => 'default'
|
default => <<"default">>
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
{"limiter",
|
{"limiter",
|
||||||
|
@ -2283,7 +2283,7 @@ common_ssl_opts_schema(Defaults, Type) ->
|
||||||
sc(
|
sc(
|
||||||
hoconsc:enum([verify_peer, verify_none]),
|
hoconsc:enum([verify_peer, verify_none]),
|
||||||
#{
|
#{
|
||||||
default => Df("verify", verify_none),
|
default => Df("verify", <<"verify_none">>),
|
||||||
desc => ?DESC(common_ssl_opts_schema_verify)
|
desc => ?DESC(common_ssl_opts_schema_verify)
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
|
@ -2351,7 +2351,7 @@ common_ssl_opts_schema(Defaults, Type) ->
|
||||||
emergency, alert, critical, error, warning, notice, info, debug, none, all
|
emergency, alert, critical, error, warning, notice, info, debug, none, all
|
||||||
]),
|
]),
|
||||||
#{
|
#{
|
||||||
default => notice,
|
default => <<"notice">>,
|
||||||
desc => ?DESC(common_ssl_opts_schema_log_level),
|
desc => ?DESC(common_ssl_opts_schema_log_level),
|
||||||
importance => ?IMPORTANCE_LOW
|
importance => ?IMPORTANCE_LOW
|
||||||
}
|
}
|
||||||
|
@ -2611,7 +2611,7 @@ authz_fields() ->
|
||||||
sc(
|
sc(
|
||||||
hoconsc:enum([allow, deny]),
|
hoconsc:enum([allow, deny]),
|
||||||
#{
|
#{
|
||||||
default => allow,
|
default => <<"allow">>,
|
||||||
required => true,
|
required => true,
|
||||||
desc => ?DESC(fields_authorization_no_match)
|
desc => ?DESC(fields_authorization_no_match)
|
||||||
}
|
}
|
||||||
|
@ -2620,7 +2620,7 @@ authz_fields() ->
|
||||||
sc(
|
sc(
|
||||||
hoconsc:enum([ignore, disconnect]),
|
hoconsc:enum([ignore, disconnect]),
|
||||||
#{
|
#{
|
||||||
default => ignore,
|
default => <<"ignore">>,
|
||||||
required => true,
|
required => true,
|
||||||
desc => ?DESC(fields_authorization_deny_action)
|
desc => ?DESC(fields_authorization_deny_action)
|
||||||
}
|
}
|
||||||
|
|
|
@ -286,9 +286,9 @@ perform_sanity_checks(_App) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
ensure_config_handler(Module, ConfigPath) ->
|
ensure_config_handler(Module, ConfigPath) ->
|
||||||
#{handlers := Handlers} = sys:get_state(emqx_config_handler),
|
#{handlers := Handlers} = emqx_config_handler:info(),
|
||||||
case emqx_utils_maps:deep_get(ConfigPath, Handlers, not_found) of
|
case emqx_utils_maps:deep_get(ConfigPath, Handlers, not_found) of
|
||||||
#{{mod} := Module} -> ok;
|
#{'$mod' := Module} -> ok;
|
||||||
NotFound -> error({config_handler_missing, ConfigPath, Module, NotFound})
|
NotFound -> error({config_handler_missing, ConfigPath, Module, NotFound})
|
||||||
end,
|
end,
|
||||||
ok.
|
ok.
|
||||||
|
|
|
@ -19,7 +19,7 @@
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
-define(MOD, {mod}).
|
-define(MOD, '$mod').
|
||||||
-define(WKEY, '?').
|
-define(WKEY, '?').
|
||||||
-define(CLUSTER_CONF, "/tmp/cluster.conf").
|
-define(CLUSTER_CONF, "/tmp/cluster.conf").
|
||||||
|
|
||||||
|
|
|
@ -31,7 +31,6 @@
|
||||||
-export([dump_schema/2]).
|
-export([dump_schema/2]).
|
||||||
-export([schema_module/0]).
|
-export([schema_module/0]).
|
||||||
-export([gen_example_conf/2]).
|
-export([gen_example_conf/2]).
|
||||||
-export([reload_etc_conf_on_local_node/0]).
|
|
||||||
-export([check_config/2]).
|
-export([check_config/2]).
|
||||||
|
|
||||||
%% TODO: move to emqx_dashboard when we stop building api schema at build time
|
%% TODO: move to emqx_dashboard when we stop building api schema at build time
|
||||||
|
@ -216,94 +215,6 @@ schema_module() ->
|
||||||
Value -> list_to_existing_atom(Value)
|
Value -> list_to_existing_atom(Value)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @doc Reload etc/emqx.conf to runtime config except for the readonly config
|
|
||||||
-spec reload_etc_conf_on_local_node() -> ok | {error, term()}.
|
|
||||||
reload_etc_conf_on_local_node() ->
|
|
||||||
case load_etc_config_file() of
|
|
||||||
{ok, RawConf} ->
|
|
||||||
case check_readonly_config(RawConf) of
|
|
||||||
{ok, Reloaded} -> reload_config(Reloaded);
|
|
||||||
{error, Error} -> {error, Error}
|
|
||||||
end;
|
|
||||||
{error, _Error} ->
|
|
||||||
{error, bad_hocon_file}
|
|
||||||
end.
|
|
||||||
|
|
||||||
reload_config(AllConf) ->
|
|
||||||
Func = fun(Key, Conf, Acc) ->
|
|
||||||
case emqx:update_config([Key], Conf, #{persistent => false}) of
|
|
||||||
{ok, _} ->
|
|
||||||
io:format("Reloaded ~ts config ok~n", [Key]),
|
|
||||||
Acc;
|
|
||||||
Error ->
|
|
||||||
?ELOG("Reloaded ~ts config failed~n~p~n", [Key, Error]),
|
|
||||||
?SLOG(error, #{
|
|
||||||
msg => "failed_to_reload_etc_config",
|
|
||||||
key => Key,
|
|
||||||
value => Conf,
|
|
||||||
error => Error
|
|
||||||
}),
|
|
||||||
Acc#{Key => Error}
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
Res = maps:fold(Func, #{}, AllConf),
|
|
||||||
case Res =:= #{} of
|
|
||||||
true -> ok;
|
|
||||||
false -> {error, Res}
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% @doc Merge etc/emqx.conf on top of cluster.hocon.
|
|
||||||
%% For example:
|
|
||||||
%% `authorization.sources` will be merged into cluster.hocon when updated via dashboard,
|
|
||||||
%% but `authorization.sources` in not in the default emqx.conf file.
|
|
||||||
%% To make sure all root keys in emqx.conf has a fully merged value.
|
|
||||||
load_etc_config_file() ->
|
|
||||||
ConfFiles = emqx_config:config_files(),
|
|
||||||
Opts = #{format => map, include_dirs => emqx_config:include_dirs()},
|
|
||||||
case hocon:files(ConfFiles, Opts) of
|
|
||||||
{ok, RawConf} ->
|
|
||||||
HasDeprecatedFile = emqx_config:has_deprecated_file(),
|
|
||||||
%% Merge etc.conf on top of cluster.hocon,
|
|
||||||
%% Don't use map deep_merge, use hocon files merge instead.
|
|
||||||
%% In order to have a chance to delete. (e.g. zones.zone1.mqtt = null)
|
|
||||||
Keys = maps:keys(RawConf),
|
|
||||||
MergedRaw = emqx_config:load_config_files(HasDeprecatedFile, ConfFiles),
|
|
||||||
{ok, maps:with(Keys, MergedRaw)};
|
|
||||||
{error, Error} ->
|
|
||||||
?SLOG(error, #{
|
|
||||||
msg => "failed_to_read_etc_config",
|
|
||||||
files => ConfFiles,
|
|
||||||
error => Error
|
|
||||||
}),
|
|
||||||
{error, Error}
|
|
||||||
end.
|
|
||||||
|
|
||||||
check_readonly_config(Raw) ->
|
|
||||||
SchemaMod = schema_module(),
|
|
||||||
RawDefault = emqx_config:fill_defaults(Raw),
|
|
||||||
case check_config(SchemaMod, RawDefault) of
|
|
||||||
{ok, CheckedConf} ->
|
|
||||||
case
|
|
||||||
lists:filtermap(fun(Key) -> filter_changed(Key, CheckedConf) end, ?READONLY_KEYS)
|
|
||||||
of
|
|
||||||
[] ->
|
|
||||||
{ok, maps:without([atom_to_binary(K) || K <- ?READONLY_KEYS], Raw)};
|
|
||||||
Error ->
|
|
||||||
?SLOG(error, #{
|
|
||||||
msg => "failed_to_change_read_only_key_in_etc_config",
|
|
||||||
read_only_keys => ?READONLY_KEYS,
|
|
||||||
error => Error
|
|
||||||
}),
|
|
||||||
{error, Error}
|
|
||||||
end;
|
|
||||||
{error, Error} ->
|
|
||||||
?SLOG(error, #{
|
|
||||||
msg => "failed_to_check_etc_config",
|
|
||||||
error => Error
|
|
||||||
}),
|
|
||||||
{error, Error}
|
|
||||||
end.
|
|
||||||
|
|
||||||
check_config(Mod, Raw) ->
|
check_config(Mod, Raw) ->
|
||||||
try
|
try
|
||||||
{_AppEnvs, CheckedConf} = emqx_config:check_config(Mod, Raw),
|
{_AppEnvs, CheckedConf} = emqx_config:check_config(Mod, Raw),
|
||||||
|
@ -317,18 +228,6 @@ check_config(Mod, Raw) ->
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
filter_changed(Key, ChangedConf) ->
|
|
||||||
Prev = get([Key], #{}),
|
|
||||||
New = maps:get(Key, ChangedConf, #{}),
|
|
||||||
case Prev =/= New of
|
|
||||||
true -> {true, {Key, changed(New, Prev)}};
|
|
||||||
false -> false
|
|
||||||
end.
|
|
||||||
|
|
||||||
changed(New, Prev) ->
|
|
||||||
Diff = emqx_utils_maps:diff_maps(New, Prev),
|
|
||||||
maps:filter(fun(_Key, Value) -> Value =/= #{} end, maps:remove(identical, Diff)).
|
|
||||||
|
|
||||||
%% @doc Make a resolver function that can be used to lookup the description by hocon_schema_json dump.
|
%% @doc Make a resolver function that can be used to lookup the description by hocon_schema_json dump.
|
||||||
make_desc_resolver(Lang) ->
|
make_desc_resolver(Lang) ->
|
||||||
fun
|
fun
|
||||||
|
|
|
@ -31,6 +31,7 @@
|
||||||
%% kept cluster_call for compatibility
|
%% kept cluster_call for compatibility
|
||||||
-define(CLUSTER_CALL, cluster_call).
|
-define(CLUSTER_CALL, cluster_call).
|
||||||
-define(CONF, conf).
|
-define(CONF, conf).
|
||||||
|
-define(UPDATE_READONLY_KEYS_PROHIBITED, "update_readonly_keys_prohibited").
|
||||||
|
|
||||||
load() ->
|
load() ->
|
||||||
emqx_ctl:register_command(?CLUSTER_CALL, {?MODULE, admins}, [hidden]),
|
emqx_ctl:register_command(?CLUSTER_CALL, {?MODULE, admins}, [hidden]),
|
||||||
|
@ -55,7 +56,7 @@ conf(["load", Path]) ->
|
||||||
conf(["cluster_sync" | Args]) ->
|
conf(["cluster_sync" | Args]) ->
|
||||||
admins(Args);
|
admins(Args);
|
||||||
conf(["reload"]) ->
|
conf(["reload"]) ->
|
||||||
emqx_conf:reload_etc_conf_on_local_node();
|
reload_etc_conf_on_local_node();
|
||||||
conf(_) ->
|
conf(_) ->
|
||||||
emqx_ctl:usage(usage_conf() ++ usage_sync()).
|
emqx_ctl:usage(usage_conf() ++ usage_sync()).
|
||||||
|
|
||||||
|
@ -190,16 +191,25 @@ load_config(Path, AuthChain) ->
|
||||||
{ok, RawConf} ->
|
{ok, RawConf} ->
|
||||||
case check_config(RawConf) of
|
case check_config(RawConf) of
|
||||||
ok ->
|
ok ->
|
||||||
maps:foreach(fun(K, V) -> update_config(K, V, AuthChain) end, RawConf);
|
lists:foreach(
|
||||||
{error, Reason} when is_list(Reason) ->
|
fun({K, V}) -> update_config(K, V, AuthChain) end,
|
||||||
|
to_sorted_list(RawConf)
|
||||||
|
);
|
||||||
|
{error, ?UPDATE_READONLY_KEYS_PROHIBITED = Reason} ->
|
||||||
emqx_ctl:warning("load ~ts failed~n~ts~n", [Path, Reason]),
|
emqx_ctl:warning("load ~ts failed~n~ts~n", [Path, Reason]),
|
||||||
emqx_ctl:warning(
|
emqx_ctl:warning(
|
||||||
"Maybe try `emqx_ctl conf reload` to reload etc/emqx.conf on local node~n"
|
"Maybe try `emqx_ctl conf reload` to reload etc/emqx.conf on local node~n"
|
||||||
),
|
),
|
||||||
{error, Reason};
|
{error, Reason};
|
||||||
{error, Reason} when is_map(Reason) ->
|
{error, Errors} ->
|
||||||
emqx_ctl:warning("load ~ts schema check failed~n~p~n", [Path, Reason]),
|
emqx_ctl:warning("load ~ts schema check failed~n", [Path]),
|
||||||
{error, Reason}
|
lists:foreach(
|
||||||
|
fun({Key, Error}) ->
|
||||||
|
emqx_ctl:warning("~ts: ~p~n", [Key, Error])
|
||||||
|
end,
|
||||||
|
Errors
|
||||||
|
),
|
||||||
|
{error, Errors}
|
||||||
end;
|
end;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
emqx_ctl:warning("load ~ts failed~n~p~n", [Path, Reason]),
|
emqx_ctl:warning("load ~ts failed~n~p~n", [Path, Reason]),
|
||||||
|
@ -207,14 +217,11 @@ load_config(Path, AuthChain) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
update_config(?EMQX_AUTHORIZATION_CONFIG_ROOT_NAME = Key, Conf, "merge") ->
|
update_config(?EMQX_AUTHORIZATION_CONFIG_ROOT_NAME = Key, Conf, "merge") ->
|
||||||
Res = emqx_authz:merge(Conf),
|
check_res(Key, emqx_authz:merge(Conf));
|
||||||
check_res(Key, Res);
|
|
||||||
update_config(?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME = Key, Conf, "merge") ->
|
update_config(?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME = Key, Conf, "merge") ->
|
||||||
Res = emqx_authn:merge_config(Conf),
|
check_res(Key, emqx_authn:merge_config(Conf));
|
||||||
check_res(Key, Res);
|
|
||||||
update_config(Key, Value, _) ->
|
update_config(Key, Value, _) ->
|
||||||
Res = emqx_conf:update([Key], Value, ?OPTIONS),
|
check_res(Key, emqx_conf:update([Key], Value, ?OPTIONS)).
|
||||||
check_res(Key, Res).
|
|
||||||
|
|
||||||
check_res(Key, {ok, _}) -> emqx_ctl:print("load ~ts in cluster ok~n", [Key]);
|
check_res(Key, {ok, _}) -> emqx_ctl:print("load ~ts in cluster ok~n", [Key]);
|
||||||
check_res(Key, {error, Reason}) -> emqx_ctl:warning("load ~ts failed~n~p~n", [Key, Reason]).
|
check_res(Key, {error, Reason}) -> emqx_ctl:warning("load ~ts failed~n~p~n", [Key, Reason]).
|
||||||
|
@ -230,24 +237,123 @@ check_keys_is_not_readonly(Conf) ->
|
||||||
ReadOnlyKeys = [atom_to_binary(K) || K <- ?READONLY_KEYS],
|
ReadOnlyKeys = [atom_to_binary(K) || K <- ?READONLY_KEYS],
|
||||||
case ReadOnlyKeys -- Keys of
|
case ReadOnlyKeys -- Keys of
|
||||||
ReadOnlyKeys -> ok;
|
ReadOnlyKeys -> ok;
|
||||||
_ -> {error, "update_readonly_keys_prohibited"}
|
_ -> {error, ?UPDATE_READONLY_KEYS_PROHIBITED}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
check_config_schema(Conf) ->
|
check_config_schema(Conf) ->
|
||||||
SchemaMod = emqx_conf:schema_module(),
|
SchemaMod = emqx_conf:schema_module(),
|
||||||
Res =
|
Fold = fun({Key, Value}, Acc) ->
|
||||||
maps:fold(
|
|
||||||
fun(Key, Value, Acc) ->
|
|
||||||
Schema = emqx_config_handler:schema(SchemaMod, [Key]),
|
Schema = emqx_config_handler:schema(SchemaMod, [Key]),
|
||||||
case emqx_conf:check_config(Schema, #{Key => Value}) of
|
case emqx_conf:check_config(Schema, #{Key => Value}) of
|
||||||
{ok, _} -> Acc;
|
{ok, _} -> Acc;
|
||||||
{error, Reason} -> #{Key => Reason}
|
{error, Reason} -> [{Key, Reason} | Acc]
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
#{},
|
sorted_fold(Fold, Conf).
|
||||||
Conf
|
|
||||||
),
|
%% @doc Reload etc/emqx.conf to runtime config except for the readonly config
|
||||||
case Res =:= #{} of
|
-spec reload_etc_conf_on_local_node() -> ok | {error, term()}.
|
||||||
true -> ok;
|
reload_etc_conf_on_local_node() ->
|
||||||
false -> {error, Res}
|
case load_etc_config_file() of
|
||||||
|
{ok, RawConf} ->
|
||||||
|
case check_readonly_config(RawConf) of
|
||||||
|
{ok, Reloaded} -> reload_config(Reloaded);
|
||||||
|
{error, Error} -> {error, Error}
|
||||||
|
end;
|
||||||
|
{error, _Error} ->
|
||||||
|
{error, bad_hocon_file}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% @doc Merge etc/emqx.conf on top of cluster.hocon.
|
||||||
|
%% For example:
|
||||||
|
%% `authorization.sources` will be merged into cluster.hocon when updated via dashboard,
|
||||||
|
%% but `authorization.sources` in not in the default emqx.conf file.
|
||||||
|
%% To make sure all root keys in emqx.conf has a fully merged value.
|
||||||
|
load_etc_config_file() ->
|
||||||
|
ConfFiles = emqx_config:config_files(),
|
||||||
|
Opts = #{format => map, include_dirs => emqx_config:include_dirs()},
|
||||||
|
case hocon:files(ConfFiles, Opts) of
|
||||||
|
{ok, RawConf} ->
|
||||||
|
HasDeprecatedFile = emqx_config:has_deprecated_file(),
|
||||||
|
%% Merge etc.conf on top of cluster.hocon,
|
||||||
|
%% Don't use map deep_merge, use hocon files merge instead.
|
||||||
|
%% In order to have a chance to delete. (e.g. zones.zone1.mqtt = null)
|
||||||
|
Keys = maps:keys(RawConf),
|
||||||
|
MergedRaw = emqx_config:load_config_files(HasDeprecatedFile, ConfFiles),
|
||||||
|
{ok, maps:with(Keys, MergedRaw)};
|
||||||
|
{error, Error} ->
|
||||||
|
?SLOG(error, #{
|
||||||
|
msg => "failed_to_read_etc_config",
|
||||||
|
files => ConfFiles,
|
||||||
|
error => Error
|
||||||
|
}),
|
||||||
|
{error, Error}
|
||||||
|
end.
|
||||||
|
|
||||||
|
check_readonly_config(Raw) ->
|
||||||
|
SchemaMod = emqx_conf:schema_module(),
|
||||||
|
RawDefault = emqx_config:fill_defaults(Raw),
|
||||||
|
case emqx_conf:check_config(SchemaMod, RawDefault) of
|
||||||
|
{ok, CheckedConf} ->
|
||||||
|
case filter_changed_readonly_keys(CheckedConf) of
|
||||||
|
[] ->
|
||||||
|
ReadOnlyKeys = [atom_to_binary(K) || K <- ?READONLY_KEYS],
|
||||||
|
{ok, maps:without(ReadOnlyKeys, Raw)};
|
||||||
|
Error ->
|
||||||
|
?SLOG(error, #{
|
||||||
|
msg => ?UPDATE_READONLY_KEYS_PROHIBITED,
|
||||||
|
read_only_keys => ?READONLY_KEYS,
|
||||||
|
error => Error
|
||||||
|
}),
|
||||||
|
{error, Error}
|
||||||
|
end;
|
||||||
|
{error, Error} ->
|
||||||
|
?SLOG(error, #{
|
||||||
|
msg => "bad_etc_config_schema_found",
|
||||||
|
error => Error
|
||||||
|
}),
|
||||||
|
{error, Error}
|
||||||
|
end.
|
||||||
|
|
||||||
|
reload_config(AllConf) ->
|
||||||
|
Fold = fun({Key, Conf}, Acc) ->
|
||||||
|
case emqx:update_config([Key], Conf, #{persistent => false}) of
|
||||||
|
{ok, _} ->
|
||||||
|
emqx_ctl:print("Reloaded ~ts config ok~n", [Key]),
|
||||||
|
Acc;
|
||||||
|
Error ->
|
||||||
|
emqx_ctl:warning("Reloaded ~ts config failed~n~p~n", [Key, Error]),
|
||||||
|
?SLOG(error, #{
|
||||||
|
msg => "failed_to_reload_etc_config",
|
||||||
|
key => Key,
|
||||||
|
value => Conf,
|
||||||
|
error => Error
|
||||||
|
}),
|
||||||
|
[{Key, Error} | Acc]
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
sorted_fold(Fold, AllConf).
|
||||||
|
|
||||||
|
filter_changed_readonly_keys(Conf) ->
|
||||||
|
lists:filtermap(fun(Key) -> filter_changed(Key, Conf) end, ?READONLY_KEYS).
|
||||||
|
|
||||||
|
filter_changed(Key, ChangedConf) ->
|
||||||
|
Prev = emqx_conf:get([Key], #{}),
|
||||||
|
New = maps:get(Key, ChangedConf, #{}),
|
||||||
|
case Prev =/= New of
|
||||||
|
true -> {true, {Key, changed(New, Prev)}};
|
||||||
|
false -> false
|
||||||
|
end.
|
||||||
|
|
||||||
|
changed(New, Prev) ->
|
||||||
|
Diff = emqx_utils_maps:diff_maps(New, Prev),
|
||||||
|
maps:filter(fun(_Key, Value) -> Value =/= #{} end, maps:remove(identical, Diff)).
|
||||||
|
|
||||||
|
sorted_fold(Func, Conf) ->
|
||||||
|
case lists:foldl(Func, [], to_sorted_list(Conf)) of
|
||||||
|
[] -> ok;
|
||||||
|
Error -> {error, Error}
|
||||||
|
end.
|
||||||
|
|
||||||
|
to_sorted_list(Conf) ->
|
||||||
|
lists:keysort(1, maps:to_list(Conf)).
|
||||||
|
|
|
@ -40,18 +40,21 @@ t_load_config(Config) ->
|
||||||
ConfBin0 = hocon_pp:do(#{<<"authorization">> => Conf#{<<"sources">> => []}}, #{}),
|
ConfBin0 = hocon_pp:do(#{<<"authorization">> => Conf#{<<"sources">> => []}}, #{}),
|
||||||
ConfFile0 = prepare_conf_file(?FUNCTION_NAME, ConfBin0, Config),
|
ConfFile0 = prepare_conf_file(?FUNCTION_NAME, ConfBin0, Config),
|
||||||
ok = emqx_conf_cli:conf(["load", ConfFile0]),
|
ok = emqx_conf_cli:conf(["load", ConfFile0]),
|
||||||
?assertEqual(Conf#{<<"sources">> := []}, emqx_conf:get_raw([Authz])),
|
?assertEqual(Conf#{<<"sources">> => []}, emqx_conf:get_raw([Authz])),
|
||||||
%% remove sources, it will reset to default file source.
|
%% remove sources, it will reset to default file source.
|
||||||
ConfBin1 = hocon_pp:do(#{<<"authorization">> => maps:remove(<<"sources">>, Conf)}, #{}),
|
ConfBin1 = hocon_pp:do(#{<<"authorization">> => maps:remove(<<"sources">>, Conf)}, #{}),
|
||||||
ConfFile1 = prepare_conf_file(?FUNCTION_NAME, ConfBin1, Config),
|
ConfFile1 = prepare_conf_file(?FUNCTION_NAME, ConfBin1, Config),
|
||||||
ok = emqx_conf_cli:conf(["load", ConfFile1]),
|
ok = emqx_conf_cli:conf(["load", ConfFile1]),
|
||||||
Default = [emqx_authz_schema:default_authz()],
|
Default = [emqx_authz_schema:default_authz()],
|
||||||
?assertEqual(Conf#{<<"sources">> := Default}, emqx_conf:get_raw([Authz])),
|
?assertEqual(Conf#{<<"sources">> => Default}, emqx_conf:get_raw([Authz])),
|
||||||
%% reset
|
%% reset
|
||||||
ConfBin2 = hocon_pp:do(#{<<"authorization">> => Conf}, #{}),
|
ConfBin2 = hocon_pp:do(#{<<"authorization">> => Conf}, #{}),
|
||||||
ConfFile2 = prepare_conf_file(?FUNCTION_NAME, ConfBin2, Config),
|
ConfFile2 = prepare_conf_file(?FUNCTION_NAME, ConfBin2, Config),
|
||||||
ok = emqx_conf_cli:conf(["load", ConfFile2]),
|
ok = emqx_conf_cli:conf(["load", ConfFile2]),
|
||||||
?assertEqual(Conf, emqx_conf:get_raw([Authz])),
|
?assertEqual(
|
||||||
|
Conf#{<<"sources">> => [emqx_authz_schema:default_authz()]},
|
||||||
|
emqx_conf:get_raw([Authz])
|
||||||
|
),
|
||||||
?assertEqual({error, empty_hocon_file}, emqx_conf_cli:conf(["load", "non-exist-file"])),
|
?assertEqual({error, empty_hocon_file}, emqx_conf_cli:conf(["load", "non-exist-file"])),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue