%%-------------------------------------------------------------------- %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_conf_cli). -include("emqx_conf.hrl"). -include_lib("emqx_auth/include/emqx_authn_chains.hrl"). -include_lib("emqx/include/logger.hrl"). -export([ load/0, admins/1, conf/1, audit/3, unload/0 ]). -export([keys/0, get_config/0, get_config/1, load_config/2]). -include_lib("hocon/include/hoconsc.hrl"). %% kept cluster_call for compatibility -define(CLUSTER_CALL, cluster_call). -define(CONF, conf). -define(AUDIT_MOD, audit). -define(UPDATE_READONLY_KEYS_PROHIBITED, <<"update_readonly_keys_prohibited">>). -dialyzer({no_match, [load/0]}). load() -> emqx_ctl:register_command(?CLUSTER_CALL, {?MODULE, admins}, [hidden]), emqx_ctl:register_command(?CONF, {?MODULE, conf}, []), case emqx_release:edition() of ee -> emqx_ctl:register_command(?AUDIT_MOD, {?MODULE, audit}, [hidden]); ce -> ok end, ok. unload() -> emqx_ctl:unregister_command(?CLUSTER_CALL), emqx_ctl:unregister_command(?CONF), emqx_ctl:unregister_command(?AUDIT_MOD). conf(["show_keys" | _]) -> print_keys(keys()); conf(["show"]) -> print_hocon(get_config()); conf(["show", Key]) -> print_hocon(get_config(list_to_binary(Key))); conf(["load", "--replace", Path]) -> load_config(Path, #{mode => replace}); conf(["load", "--merge", Path]) -> load_config(Path, #{mode => merge}); conf(["load", Path]) -> load_config(Path, #{mode => merge}); conf(["cluster_sync" | Args]) -> admins(Args); conf(["reload", "--merge"]) -> reload_etc_conf_on_local_node(#{mode => merge}); conf(["reload", "--replace"]) -> reload_etc_conf_on_local_node(#{mode => replace}); conf(["reload"]) -> conf(["reload", "--merge"]); conf(_) -> emqx_ctl:usage(usage_conf() ++ usage_sync()). admins(["status"]) -> status(); admins(["skip"]) -> status(), Nodes = mria:running_nodes(), lists:foreach(fun emqx_cluster_rpc:skip_failed_commit/1, Nodes), status(); admins(["skip", Node0]) -> status(), Node = list_to_existing_atom(Node0), emqx_cluster_rpc:skip_failed_commit(Node), status(); admins(["tnxid", TnxId0]) -> %% changed to 'inspect' in 5.6 %% TODO: delete this clause in 5.7 admins(["inspect", TnxId0]); admins(["inspect", TnxId0]) -> TnxId = list_to_integer(TnxId0), print(emqx_cluster_rpc:query(TnxId)); admins(["fast_forward"]) -> status(), Nodes = mria:running_nodes(), TnxId = emqx_cluster_rpc:latest_tnx_id(), lists:foreach(fun(N) -> emqx_cluster_rpc:fast_forward_to_commit(N, TnxId) end, Nodes), status(); admins(["fast_forward", ToTnxId]) -> status(), Nodes = mria:running_nodes(), TnxId = list_to_integer(ToTnxId), lists:foreach(fun(N) -> emqx_cluster_rpc:fast_forward_to_commit(N, TnxId) end, Nodes), status(); admins(["fast_forward", Node0, ToTnxId]) -> status(), TnxId = list_to_integer(ToTnxId), Node = list_to_existing_atom(Node0), emqx_cluster_rpc:fast_forward_to_commit(Node, TnxId), status(); admins(_) -> emqx_ctl:usage(usage_sync()). audit(Level, From, Log) -> ?AUDIT(Level, redact(Log#{from => From})). redact(Logs = #{cmd := admins, args := [<<"add">>, Username, _Password | Rest]}) -> Logs#{args => [<<"add">>, Username, <<"******">> | Rest]}; redact(Logs = #{cmd := admins, args := [<<"passwd">>, Username, _Password]}) -> Logs#{args => [<<"passwd">>, Username, <<"******">>]}; redact(Logs = #{cmd := license, args := [<<"update">>, _License]}) -> Logs#{args => [<<"update">>, "******"]}; redact(Logs) -> Logs. usage_conf() -> [ {"conf reload --replace|--merge", "reload etc/emqx.conf on local node"}, {"", "The new configuration values will be overlaid on the existing values by default."}, {"", "use the --replace flag to replace existing values with the new ones instead."}, {"----------------------------------", "------------"}, {"conf show_keys", "print all the currently used configuration keys."}, {"conf show []", "Print in-use configs (including default values) under the given key."}, {"", "Print ALL keys if key is not provided"}, {"conf load --replace|--merge ", "Load a HOCON format config file."}, {"", "The new configuration values will be overlaid on the existing values by default."}, {"", "use the --replace flag to replace existing values with the new ones instead."}, {"", "The current node will initiate a cluster wide config change"}, {"", "transaction to sync the changes to other nodes in the cluster. "}, {"", "NOTE: do not make runtime config changes during rolling upgrade."}, {"----------------------------------", "------------"} ]. usage_sync() -> [ {"conf cluster_sync status", "Show cluster config sync status summary for all nodes."}, {"conf cluster_sync inspect ", "Inspect detailed information of the config change transaction at the given commit ID"}, {"conf cluster_sync skip [node]", "Increment the (currently failing) commit on the given node.\n" "WARNING: This results in inconsistent configs among the clustered nodes."}, {"conf cluster_sync fast_forward [node] ", "Fast-forward config change to the given commit ID on the given node.\n" "WARNING: This results in inconsistent configs among the clustered nodes."} ]. status() -> emqx_ctl:print("-----------------------------------------------\n"), {atomic, Status} = emqx_cluster_rpc:status(), lists:foreach( fun(S) -> #{ node := Node, tnx_id := TnxId, mfa := {M, F, A}, created_at := CreatedAt } = S, emqx_ctl:print( "~p:[~w] CreatedAt:~p ~p:~p/~w\n", [Node, TnxId, CreatedAt, M, F, length(A)] ) end, Status ), emqx_ctl:print("-----------------------------------------------\n"). print_keys(Keys) -> SortKeys = lists:sort(Keys), emqx_ctl:print("~1p~n", [[binary_to_existing_atom(K) || K <- SortKeys]]). print(Json) -> emqx_ctl:print("~ts~n", [emqx_logger_jsonfmt:best_effort_json(Json)]). print_hocon(Hocon) when is_map(Hocon) -> emqx_ctl:print("~ts~n", [hocon_pp:do(Hocon, #{})]); print_hocon({error, Error}) -> emqx_ctl:warning("~ts~n", [Error]). get_config() -> AllConf = fill_defaults(emqx:get_raw_config([])), drop_hidden_roots(AllConf). keys() -> emqx_config:get_root_names() -- hidden_roots(). drop_hidden_roots(Conf) -> maps:without(hidden_roots(), Conf). hidden_roots() -> [ <<"trace">>, <<"stats">>, <<"broker">>, <<"persistent_session_store">>, <<"session_persistence">>, <<"plugins">>, <<"zones">> ]. get_config(Key) -> case emqx:get_raw_config([Key], undefined) of undefined -> {error, "key_not_found"}; Value -> fill_defaults(#{Key => Value}) end. -define(OPTIONS, #{rawconf_with_defaults => true, override_to => cluster}). load_config(Path, Opts) when is_list(Path) -> case hocon:files([Path]) of {ok, RawConf} when RawConf =:= #{} -> emqx_ctl:warning("load ~ts is empty~n", [Path]), {error, empty_hocon_file}; {ok, RawConf} -> load_config_from_raw(RawConf, Opts); {error, Reason} -> emqx_ctl:warning("load ~ts failed~n~p~n", [Path, Reason]), {error, bad_hocon_file} end; load_config(Bin, Opts) when is_binary(Bin) -> case hocon:binary(Bin) of {ok, RawConf} -> load_config_from_raw(RawConf, Opts); {error, Reason} -> {error, Reason} end. load_config_from_raw(RawConf0, Opts) -> SchemaMod = emqx_conf:schema_module(), RawConf1 = emqx_config:upgrade_raw_conf(SchemaMod, RawConf0), RawConf = emqx_config:fill_defaults(RawConf1), case check_config(RawConf) of ok -> %% It has been ensured that the connector is always the first configuration to be updated. %% However, when deleting the connector, we need to clean up the dependent actions first; %% otherwise, the deletion will fail. %% notice: we can't create a action before connector. uninstall_actions(RawConf, Opts), Error = lists:filtermap( fun({K, V}) -> case update_config_cluster(K, V, Opts) of ok -> false; {error, Msg} -> {true, Msg} end end, to_sorted_list(RawConf) ), case iolist_to_binary(Error) of <<"">> -> ok; ErrorBin -> {error, ErrorBin} end; {error, ?UPDATE_READONLY_KEYS_PROHIBITED = Reason} -> warning(Opts, "load config failed~n~ts~n", [Reason]), warning( Opts, "Maybe try `emqx_ctl conf reload` to reload etc/emqx.conf on local node~n", [] ), {error, Reason}; {error, Errors} -> warning(Opts, "load schema check failed~n", []), lists:foreach( fun({Key, Error}) -> warning(Opts, "~ts: ~p~n", [Key, Error]) end, Errors ), {error, Errors} end. uninstall_actions(#{<<"actions">> := New}, #{mode := replace}) -> Old = emqx_conf:get_raw([<<"actions">>], #{}), #{removed := Removed} = emqx_bridge_v2:diff_confs(New, Old), maps:foreach( fun({Type, Name}, _) -> case emqx_bridge_v2:remove(Type, Name) of ok -> ok; {error, Reason} -> ?SLOG(error, #{ msg => "failed_to_remove_action", type => Type, name => Name, error => Reason }) end end, Removed ); %% we don't delete things when in merge mode or without actions key. uninstall_actions(_RawConf, _) -> ok. update_config_cluster( ?EMQX_AUTHORIZATION_CONFIG_ROOT_NAME_BINARY = Key, Conf, #{mode := merge} = Opts ) -> check_res(Key, emqx_authz:merge(Conf), Conf, Opts); update_config_cluster( ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY = Key, Conf, #{mode := merge} = Opts ) -> check_res(Key, emqx_authn:merge_config(Conf), Conf, Opts); update_config_cluster(Key, NewConf, #{mode := merge} = Opts) -> Merged = merge_conf(Key, NewConf), check_res(Key, emqx_conf:update([Key], Merged, ?OPTIONS), NewConf, Opts); update_config_cluster(Key, Value, #{mode := replace} = Opts) -> check_res(Key, emqx_conf:update([Key], Value, ?OPTIONS), Value, Opts). -define(LOCAL_OPTIONS, #{rawconf_with_defaults => true, persistent => false}). update_config_local( ?EMQX_AUTHORIZATION_CONFIG_ROOT_NAME_BINARY = Key, Conf, #{mode := merge} = Opts ) -> check_res(node(), Key, emqx_authz:merge_local(Conf, ?LOCAL_OPTIONS), Conf, Opts); update_config_local( ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY = Key, Conf, #{mode := merge} = Opts ) -> check_res(node(), Key, emqx_authn:merge_config_local(Conf, ?LOCAL_OPTIONS), Conf, Opts); update_config_local(Key, NewConf, #{mode := merge} = Opts) -> Merged = merge_conf(Key, NewConf), check_res(node(), Key, emqx:update_config([Key], Merged, ?LOCAL_OPTIONS), NewConf, Opts); update_config_local(Key, Value, #{mode := replace} = Opts) -> check_res(node(), Key, emqx:update_config([Key], Value, ?LOCAL_OPTIONS), Value, Opts). check_res(Key, Res, Conf, Opts) -> check_res(cluster, Key, Res, Conf, Opts). check_res(Node, Key, {ok, _}, _Conf, Opts) -> print(Opts, "load ~ts on ~p ok~n", [Key, Node]), ok; check_res(_Node, Key, {error, Reason}, Conf, Opts = #{mode := Mode}) -> Warning = "Can't ~ts the new configurations!~n" "Root key: ~ts~n" "Reason: ~p~n", ActiveMsg0 = "The effective configurations:~n" "```~n" "~ts```~n~n", ActiveMsg = io_lib:format(ActiveMsg0, [hocon_pp:do(#{Key => emqx_conf:get_raw([Key])}, #{})]), FailedMsg0 = "Try to ~ts with:~n" "```~n" "~ts```~n", FailedMsg = io_lib:format(FailedMsg0, [Mode, hocon_pp:do(#{Key => Conf}, #{})]), SuggestMsg = suggest_msg(Reason, Mode), Msg = iolist_to_binary([ActiveMsg, FailedMsg, SuggestMsg]), print(Opts, "~ts~n", [Msg]), warning(Opts, Warning, [Mode, Key, Reason]), {error, iolist_to_binary([Msg, "\n", io_lib:format(Warning, [Mode, Key, Reason])])}. %% The mix data failed validation, suggest the user to retry with another mode. suggest_msg(#{kind := validation_error, reason := unknown_fields}, Mode) -> RetryMode = case Mode of merge -> "replace"; replace -> "merge" end, io_lib:format( "Tips: There may be some conflicts in the new configuration under `~ts` mode,~n" "Please retry with the `~ts` mode.", [Mode, RetryMode] ); suggest_msg(_, _) -> <<"">>. check_config(Conf) -> case check_keys_is_not_readonly(Conf) of ok -> check_config_schema(Conf); Error -> Error end. check_keys_is_not_readonly(Conf) -> Keys = maps:keys(Conf), ReadOnlyKeys = [atom_to_binary(K) || K <- ?READONLY_KEYS], case ReadOnlyKeys -- Keys of ReadOnlyKeys -> ok; _ -> {error, ?UPDATE_READONLY_KEYS_PROHIBITED} end. check_config_schema(Conf) -> SchemaMod = emqx_conf:schema_module(), Fold = fun({Key, Value}, Acc) -> case check_config(SchemaMod, Key, Value) of {ok, _} -> Acc; {error, Reason} -> [{Key, Reason} | Acc] end end, sorted_fold(Fold, Conf). %% @doc Reload etc/emqx.conf to runtime config except for the readonly config -spec reload_etc_conf_on_local_node(#{mode => replace | merge}) -> ok | {error, term()}. reload_etc_conf_on_local_node(Opts) -> case load_etc_config_file() of {ok, RawConf} -> case filter_readonly_config(RawConf) of {ok, Reloaded} -> reload_config(Reloaded, Opts); {error, Error} -> warning(Opts, "check config failed~n~p~n", [Error]), {error, Error} end; {error, Error} -> warning(Opts, "bad_hocon_file~n ~p~n", [Error]), {error, bad_hocon_file} end. %% @doc Merge etc/emqx.conf on top of cluster.hocon. %% For example: %% `authorization.sources` will be merged into cluster.hocon when updated via dashboard, %% but `authorization.sources` in not in the default emqx.conf file. %% To make sure all root keys in emqx.conf has a fully merged value. load_etc_config_file() -> ConfFiles = emqx_config:config_files(), Opts = #{format => map, include_dirs => emqx_config:include_dirs()}, case hocon:files(ConfFiles, Opts) of {ok, RawConf} -> HasDeprecatedFile = emqx_config:has_deprecated_file(), %% Merge etc.conf on top of cluster.hocon, %% Don't use map deep_merge, use hocon files merge instead. %% In order to have a chance to delete. (e.g. zones.zone1.mqtt = null) Keys = maps:keys(RawConf), MergedRaw = emqx_config:load_config_files(HasDeprecatedFile, ConfFiles), {ok, maps:with(Keys, MergedRaw)}; {error, Error} -> ?SLOG(error, #{ msg => "failed_to_read_etc_config", files => ConfFiles, error => Error }), {error, Error} end. filter_readonly_config(Raw) -> SchemaMod = emqx_conf:schema_module(), try RawDefault = fill_defaults(Raw), _ = emqx_config:check_config(SchemaMod, RawDefault), ReadOnlyKeys = [atom_to_binary(K) || K <- ?READONLY_KEYS], {ok, maps:without(ReadOnlyKeys, Raw)} catch throw:Error -> ?SLOG(error, #{ msg => "bad_etc_config_schema_found", error => Error }), {error, Error} end. reload_config(AllConf, Opts) -> uninstall_actions(AllConf, Opts), Fold = fun({Key, Conf}, Acc) -> case update_config_local(Key, Conf, Opts) of ok -> Acc; Error -> ?SLOG(error, #{ msg => "failed_to_reload_etc_config", key => Key, value => Conf, error => Error }), [{Key, Error} | Acc] end end, sorted_fold(Fold, AllConf). sorted_fold(Func, Conf) -> case lists:foldl(Func, [], to_sorted_list(Conf)) of [] -> ok; Error -> {error, Error} end. to_sorted_list(Conf0) -> %% connectors > actions/bridges > rule_engine Keys = [<<"connectors">>, <<"actions">>, <<"bridges">>, <<"rule_engine">>], {HighPriorities, Conf1} = split_high_priority_conf(Keys, Conf0, []), HighPriorities ++ lists:keysort(1, maps:to_list(Conf1)). split_high_priority_conf([], Conf0, Acc) -> {lists:reverse(Acc), Conf0}; split_high_priority_conf([Key | Keys], Conf0, Acc) -> case maps:take(Key, Conf0) of error -> split_high_priority_conf(Keys, Conf0, Acc); {Value, Conf1} -> split_high_priority_conf(Keys, Conf1, [{Key, Value} | Acc]) end. merge_conf(Key, NewConf) -> OldConf = emqx_conf:get_raw([Key]), do_merge_conf(OldConf, NewConf). do_merge_conf(OldConf = #{}, NewConf = #{}) -> emqx_utils_maps:deep_merge(OldConf, NewConf); do_merge_conf(_OldConf, NewConf) -> NewConf. fill_defaults(Conf) -> Conf1 = emqx_config:fill_defaults(Conf), filter_cluster_conf(Conf1). -define(ALL_STRATEGY, [<<"manual">>, <<"static">>, <<"dns">>, <<"etcd">>, <<"k8s">>]). filter_cluster_conf(#{<<"cluster">> := #{<<"discovery_strategy">> := Strategy} = Cluster} = Conf) -> Cluster1 = maps:without(lists:delete(Strategy, ?ALL_STRATEGY), Cluster), Conf#{<<"cluster">> => Cluster1}; filter_cluster_conf(Conf) -> Conf. check_config(SchemaMod, Key, Value) -> try Schema = emqx_config_handler:schema(SchemaMod, [Key]), {_AppEnvs, CheckedConf} = emqx_config:check_config(Schema, #{Key => Value}), {ok, CheckedConf} catch throw:Error -> {error, Error} end. warning(#{log := none}, _, _) -> ok; warning(_, Format, Args) -> emqx_ctl:warning(Format, Args). print(#{log := none}, _, _) -> ok; print(_, Format, Args) -> emqx_ctl:print(Format, Args).