fix: update cluster.links via cli
This commit is contained in:
parent
083537daa3
commit
2783192f77
|
@ -46,6 +46,6 @@
|
||||||
)
|
)
|
||||||
).
|
).
|
||||||
|
|
||||||
-define(READONLY_KEYS, [cluster, rpc, node]).
|
-define(READONLY_KEYS, [rpc, node]).
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
|
@ -15,6 +15,8 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-module(emqx_conf_cli).
|
-module(emqx_conf_cli).
|
||||||
|
-feature(maybe_expr, enable).
|
||||||
|
|
||||||
-include("emqx_conf.hrl").
|
-include("emqx_conf.hrl").
|
||||||
-include_lib("emqx_auth/include/emqx_authn_chains.hrl").
|
-include_lib("emqx_auth/include/emqx_authn_chains.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
@ -40,6 +42,7 @@
|
||||||
-define(UPDATE_READONLY_KEYS_PROHIBITED, <<"Cannot update read-only key '~s'.">>).
|
-define(UPDATE_READONLY_KEYS_PROHIBITED, <<"Cannot update read-only key '~s'.">>).
|
||||||
-define(SCHEMA_VALIDATION_CONF_ROOT_BIN, <<"schema_validation">>).
|
-define(SCHEMA_VALIDATION_CONF_ROOT_BIN, <<"schema_validation">>).
|
||||||
-define(MESSAGE_TRANSFORMATION_CONF_ROOT_BIN, <<"message_transformation">>).
|
-define(MESSAGE_TRANSFORMATION_CONF_ROOT_BIN, <<"message_transformation">>).
|
||||||
|
-define(LOCAL_OPTIONS, #{rawconf_with_defaults => true, persistent => false}).
|
||||||
-define(TIMEOUT, 30000).
|
-define(TIMEOUT, 30000).
|
||||||
|
|
||||||
-dialyzer({no_match, [load/0]}).
|
-dialyzer({no_match, [load/0]}).
|
||||||
|
@ -345,25 +348,21 @@ load_config_from_raw(RawConf0, Opts) ->
|
||||||
RawConf1 = emqx_config:upgrade_raw_conf(SchemaMod, RawConf0),
|
RawConf1 = emqx_config:upgrade_raw_conf(SchemaMod, RawConf0),
|
||||||
case check_config(RawConf1, Opts) of
|
case check_config(RawConf1, Opts) of
|
||||||
{ok, RawConf} ->
|
{ok, RawConf} ->
|
||||||
%% It has been ensured that the connector is always the first configuration to be updated.
|
case update_cluster_links(cluster, RawConf, Opts) of
|
||||||
%% However, when deleting the connector, we need to clean up the dependent actions/sources first;
|
ok ->
|
||||||
%% otherwise, the deletion will fail.
|
%% It has been ensured that the connector is always the first configuration to be updated.
|
||||||
%% notice: we can't create a action/sources before connector.
|
%% However, when deleting the connector, we need to clean up the dependent actions/sources first;
|
||||||
uninstall(<<"actions">>, RawConf, Opts),
|
%% otherwise, the deletion will fail.
|
||||||
uninstall(<<"sources">>, RawConf, Opts),
|
%% notice: we can't create a action/sources before connector.
|
||||||
Error =
|
uninstall(<<"actions">>, RawConf, Opts),
|
||||||
lists:filtermap(
|
uninstall(<<"sources">>, RawConf, Opts),
|
||||||
fun({K, V}) ->
|
Error = update_config_cluster(Opts, RawConf),
|
||||||
case update_config_cluster(K, V, Opts) of
|
case iolist_to_binary(Error) of
|
||||||
ok -> false;
|
<<"">> -> ok;
|
||||||
{error, Msg} -> {true, Msg}
|
ErrorBin -> {error, ErrorBin}
|
||||||
end
|
end;
|
||||||
end,
|
{error, Reason} ->
|
||||||
to_sorted_list(RawConf)
|
{error, Reason}
|
||||||
),
|
|
||||||
case iolist_to_binary(Error) of
|
|
||||||
<<"">> -> ok;
|
|
||||||
ErrorBin -> {error, ErrorBin}
|
|
||||||
end;
|
end;
|
||||||
{error, ?UPDATE_READONLY_KEYS_PROHIBITED, ReadOnlyKeyStr} ->
|
{error, ?UPDATE_READONLY_KEYS_PROHIBITED, ReadOnlyKeyStr} ->
|
||||||
Reason = iolist_to_binary(
|
Reason = iolist_to_binary(
|
||||||
|
@ -390,6 +389,26 @@ load_config_from_raw(RawConf0, Opts) ->
|
||||||
{error, Errors}
|
{error, Errors}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
update_config_cluster(Opts, RawConf) ->
|
||||||
|
lists:filtermap(
|
||||||
|
fun({K, V}) ->
|
||||||
|
case update_config_cluster(K, V, Opts) of
|
||||||
|
ok -> false;
|
||||||
|
{error, Msg} -> {true, Msg}
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
to_sorted_list(RawConf)
|
||||||
|
).
|
||||||
|
|
||||||
|
update_cluster_links(cluster, #{<<"cluster">> := #{<<"links">> := Links}}, Opts) ->
|
||||||
|
Res = emqx_conf:update([<<"cluster">>, <<"links">>], Links, ?OPTIONS),
|
||||||
|
check_res(<<"cluster.links">>, Res, Links, Opts);
|
||||||
|
update_cluster_links(local, #{<<"cluster">> := #{<<"links">> := Links}}, Opts) ->
|
||||||
|
Res = emqx:update_config([<<"cluster">>, <<"links">>], Links, ?LOCAL_OPTIONS),
|
||||||
|
check_res(node(), <<"cluster.links">>, Res, Links, Opts);
|
||||||
|
update_cluster_links(_, _, _) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
uninstall(ActionOrSource, Conf, #{mode := replace}) ->
|
uninstall(ActionOrSource, Conf, #{mode := replace}) ->
|
||||||
case maps:find(ActionOrSource, Conf) of
|
case maps:find(ActionOrSource, Conf) of
|
||||||
{ok, New} ->
|
{ok, New} ->
|
||||||
|
@ -449,7 +468,6 @@ update_config_cluster(Key, NewConf, #{mode := merge} = Opts) ->
|
||||||
update_config_cluster(Key, Value, #{mode := replace} = Opts) ->
|
update_config_cluster(Key, Value, #{mode := replace} = Opts) ->
|
||||||
check_res(Key, emqx_conf:update([Key], Value, ?OPTIONS), Value, Opts).
|
check_res(Key, emqx_conf:update([Key], Value, ?OPTIONS), Value, Opts).
|
||||||
|
|
||||||
-define(LOCAL_OPTIONS, #{rawconf_with_defaults => true, persistent => false}).
|
|
||||||
update_config_local(
|
update_config_local(
|
||||||
?EMQX_AUTHORIZATION_CONFIG_ROOT_NAME_BINARY = Key,
|
?EMQX_AUTHORIZATION_CONFIG_ROOT_NAME_BINARY = Key,
|
||||||
Conf,
|
Conf,
|
||||||
|
@ -509,15 +527,14 @@ suggest_msg(_, _) ->
|
||||||
<<"">>.
|
<<"">>.
|
||||||
|
|
||||||
check_config(Conf0, Opts) ->
|
check_config(Conf0, Opts) ->
|
||||||
case check_keys_is_not_readonly(Conf0, Opts) of
|
maybe
|
||||||
{ok, Conf1} ->
|
{ok, Conf1} ?= check_keys_is_not_readonly(Conf0, Opts),
|
||||||
Conf = emqx_config:fill_defaults(Conf1),
|
{ok, Conf2} ?= check_cluster_keys(Conf1, Opts),
|
||||||
case check_config_schema(Conf) of
|
Conf3 = emqx_config:fill_defaults(Conf2),
|
||||||
ok -> {ok, Conf};
|
ok ?= check_config_schema(Conf3),
|
||||||
{error, Reason} -> {error, Reason}
|
{ok, Conf3}
|
||||||
end;
|
else
|
||||||
Error ->
|
Error -> Error
|
||||||
Error
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
check_keys_is_not_readonly(Conf, Opts) ->
|
check_keys_is_not_readonly(Conf, Opts) ->
|
||||||
|
@ -535,6 +552,20 @@ check_keys_is_not_readonly(Conf, Opts) ->
|
||||||
{error, ?UPDATE_READONLY_KEYS_PROHIBITED, BadKeysStr}
|
{error, ?UPDATE_READONLY_KEYS_PROHIBITED, BadKeysStr}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
check_cluster_keys(Conf = #{<<"cluster">> := Cluster}, Opts) ->
|
||||||
|
IgnoreReadonly = maps:get(ignore_readonly, Opts, false),
|
||||||
|
case maps:keys(Cluster) -- [<<"links">>] of
|
||||||
|
[] ->
|
||||||
|
{ok, Conf};
|
||||||
|
Keys when IgnoreReadonly ->
|
||||||
|
?SLOG(info, #{msg => "readonly_root_keys_ignored", keys => Keys}),
|
||||||
|
{ok, Conf#{<<"cluster">> => maps:with([<<"links">>], Cluster)}};
|
||||||
|
Keys ->
|
||||||
|
BadKeys = ["cluster." ++ K || K <- Keys],
|
||||||
|
BadKeysStr = lists:join(<<",">>, BadKeys),
|
||||||
|
{error, ?UPDATE_READONLY_KEYS_PROHIBITED, BadKeysStr}
|
||||||
|
end.
|
||||||
|
|
||||||
check_config_schema(Conf) ->
|
check_config_schema(Conf) ->
|
||||||
SchemaMod = emqx_conf:schema_module(),
|
SchemaMod = emqx_conf:schema_module(),
|
||||||
Fold = fun({Key, Value}, Acc) ->
|
Fold = fun({Key, Value}, Acc) ->
|
||||||
|
@ -604,23 +635,28 @@ filter_readonly_config(Raw) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
reload_config(AllConf, Opts) ->
|
reload_config(AllConf, Opts) ->
|
||||||
uninstall(<<"actions">>, AllConf, Opts),
|
case update_cluster_links(local, AllConf, Opts) of
|
||||||
uninstall(<<"sources">>, AllConf, Opts),
|
ok ->
|
||||||
Fold = fun({Key, Conf}, Acc) ->
|
uninstall(<<"actions">>, AllConf, Opts),
|
||||||
case update_config_local(Key, Conf, Opts) of
|
uninstall(<<"sources">>, AllConf, Opts),
|
||||||
ok ->
|
Fold = fun({Key, Conf}, Acc) ->
|
||||||
Acc;
|
case update_config_local(Key, Conf, Opts) of
|
||||||
Error ->
|
ok ->
|
||||||
?SLOG(error, #{
|
Acc;
|
||||||
msg => "failed_to_reload_etc_config",
|
Error ->
|
||||||
key => Key,
|
?SLOG(error, #{
|
||||||
value => Conf,
|
msg => "failed_to_reload_etc_config",
|
||||||
error => Error
|
key => Key,
|
||||||
}),
|
value => Conf,
|
||||||
[{Key, Error} | Acc]
|
error => Error
|
||||||
end
|
}),
|
||||||
end,
|
[{Key, Error} | Acc]
|
||||||
sorted_fold(Fold, AllConf).
|
end
|
||||||
|
end,
|
||||||
|
sorted_fold(Fold, AllConf);
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
sorted_fold(Func, Conf) ->
|
sorted_fold(Func, Conf) ->
|
||||||
case lists:foldl(Func, [], to_sorted_list(Conf)) of
|
case lists:foldl(Func, [], to_sorted_list(Conf)) of
|
||||||
|
@ -629,10 +665,11 @@ sorted_fold(Func, Conf) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
to_sorted_list(Conf0) ->
|
to_sorted_list(Conf0) ->
|
||||||
|
Conf1 = maps:remove(<<"cluster">>, Conf0),
|
||||||
%% connectors > actions/bridges > rule_engine
|
%% connectors > actions/bridges > rule_engine
|
||||||
Keys = [<<"connectors">>, <<"actions">>, <<"bridges">>, <<"rule_engine">>],
|
Keys = [<<"connectors">>, <<"actions">>, <<"bridges">>, <<"rule_engine">>],
|
||||||
{HighPriorities, Conf1} = split_high_priority_conf(Keys, Conf0, []),
|
{HighPriorities, Conf2} = split_high_priority_conf(Keys, Conf1, []),
|
||||||
HighPriorities ++ lists:keysort(1, maps:to_list(Conf1)).
|
HighPriorities ++ lists:keysort(1, maps:to_list(Conf2)).
|
||||||
|
|
||||||
split_high_priority_conf([], Conf0, Acc) ->
|
split_high_priority_conf([], Conf0, Acc) ->
|
||||||
{lists:reverse(Acc), Conf0};
|
{lists:reverse(Acc), Conf0};
|
||||||
|
|
Loading…
Reference in New Issue