Merge pull request #13443 from zhongwencool/cluster-link-cli-load
fix: update cluster.links via cli
This commit is contained in:
commit
ac52bf39ce
|
@ -499,15 +499,14 @@ fill_defaults(RawConf, Opts) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec fill_defaults(module(), raw_config(), hocon_tconf:opts()) -> map().
|
-spec fill_defaults(module(), raw_config(), hocon_tconf:opts()) -> map().
|
||||||
fill_defaults(_SchemaMod, RawConf = #{<<"durable_storage">> := _}, _) ->
|
fill_defaults(SchemaMod, RawConf = #{<<"durable_storage">> := Ds}, Opts) ->
|
||||||
%% FIXME: kludge to prevent `emqx_config' module from filling in
|
%% FIXME: kludge to prevent `emqx_config' module from filling in
|
||||||
%% the default values for backends and layouts. These records are
|
%% the default values for backends and layouts. These records are
|
||||||
%% inside unions, and adding default values there will add
|
%% inside unions, and adding default values there will add
|
||||||
%% incompatible fields.
|
%% incompatible fields.
|
||||||
%%
|
RawConf1 = maps:remove(<<"durable_storage">>, RawConf),
|
||||||
%% Note: this function is called for each individual conf root, so
|
Conf = fill_defaults(SchemaMod, RawConf1, Opts),
|
||||||
%% this clause only affects this particular subtree.
|
Conf#{<<"durable_storage">> => Ds};
|
||||||
RawConf;
|
|
||||||
fill_defaults(SchemaMod, RawConf, Opts0) ->
|
fill_defaults(SchemaMod, RawConf, Opts0) ->
|
||||||
Opts = maps:merge(#{required => false, make_serializable => true}, Opts0),
|
Opts = maps:merge(#{required => false, make_serializable => true}, Opts0),
|
||||||
hocon_tconf:check_plain(
|
hocon_tconf:check_plain(
|
||||||
|
|
|
@ -56,9 +56,31 @@ mk_clusters(NameA, NameB, PortA, PortB, ConfA, ConfB, Config) ->
|
||||||
),
|
),
|
||||||
{NodesA, NodesB}.
|
{NodesA, NodesB}.
|
||||||
|
|
||||||
|
t_config_update_cli('init', Config0) ->
|
||||||
|
Config1 =
|
||||||
|
[
|
||||||
|
{name_prefix, ?FUNCTION_NAME}
|
||||||
|
| Config0
|
||||||
|
],
|
||||||
|
Config2 = t_config_update('init', Config1),
|
||||||
|
[
|
||||||
|
{update_from, cli}
|
||||||
|
| lists:keydelete(update_from, 1, Config2)
|
||||||
|
];
|
||||||
|
t_config_update_cli('end', Config) ->
|
||||||
|
t_config_update('end', Config).
|
||||||
|
|
||||||
|
t_config_update_cli(Config) ->
|
||||||
|
t_config_update(Config).
|
||||||
|
|
||||||
t_config_update('init', Config) ->
|
t_config_update('init', Config) ->
|
||||||
NameA = fmt("~s_~s", [?FUNCTION_NAME, "a"]),
|
NamePrefix =
|
||||||
NameB = fmt("~s_~s", [?FUNCTION_NAME, "b"]),
|
case ?config(name_prefix, Config) of
|
||||||
|
undefined -> ?FUNCTION_NAME;
|
||||||
|
Name -> Name
|
||||||
|
end,
|
||||||
|
NameA = fmt("~s_~s", [NamePrefix, "a"]),
|
||||||
|
NameB = fmt("~s_~s", [NamePrefix, "b"]),
|
||||||
LPortA = 31883,
|
LPortA = 31883,
|
||||||
LPortB = 41883,
|
LPortB = 41883,
|
||||||
ConfA = combine([conf_cluster(NameA), conf_log()]),
|
ConfA = combine([conf_cluster(NameA), conf_log()]),
|
||||||
|
@ -73,7 +95,8 @@ t_config_update('init', Config) ->
|
||||||
{lport_a, LPortA},
|
{lport_a, LPortA},
|
||||||
{lport_b, LPortB},
|
{lport_b, LPortB},
|
||||||
{name_a, NameA},
|
{name_a, NameA},
|
||||||
{name_b, NameB}
|
{name_b, NameB},
|
||||||
|
{update_from, api}
|
||||||
| Config
|
| Config
|
||||||
];
|
];
|
||||||
t_config_update('end', Config) ->
|
t_config_update('end', Config) ->
|
||||||
|
@ -113,12 +136,12 @@ t_config_update(Config) ->
|
||||||
|
|
||||||
{ok, SubRef} = snabbkaffe:subscribe(
|
{ok, SubRef} = snabbkaffe:subscribe(
|
||||||
?match_event(#{?snk_kind := clink_route_bootstrap_complete}),
|
?match_event(#{?snk_kind := clink_route_bootstrap_complete}),
|
||||||
%% 5 nodes = 5 actors (durable storage is dsabled)
|
%% 5 nodes = 5 actors (durable storage is disabled)
|
||||||
5,
|
5,
|
||||||
30_000
|
30_000
|
||||||
),
|
),
|
||||||
?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA]])),
|
?assertMatch({ok, _}, update(NodeA1, [LinkConfA], Config)),
|
||||||
?assertMatch({ok, _}, erpc:call(NodeB1, emqx_cluster_link_config, update, [[LinkConfB]])),
|
?assertMatch({ok, _}, update(NodeB1, [LinkConfB], Config)),
|
||||||
|
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, [
|
{ok, [
|
||||||
|
@ -158,7 +181,7 @@ t_config_update(Config) ->
|
||||||
|
|
||||||
%% update link
|
%% update link
|
||||||
LinkConfA1 = LinkConfA#{<<"pool_size">> => 2, <<"topics">> => [<<"t/new/+">>]},
|
LinkConfA1 = LinkConfA#{<<"pool_size">> => 2, <<"topics">> => [<<"t/new/+">>]},
|
||||||
?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA1]])),
|
?assertMatch({ok, _}, update(NodeA1, [LinkConfA1], Config)),
|
||||||
|
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, [
|
{ok, [
|
||||||
|
@ -187,20 +210,21 @@ t_config_update(Config) ->
|
||||||
|
|
||||||
%% disable link
|
%% disable link
|
||||||
LinkConfA2 = LinkConfA1#{<<"enable">> => false},
|
LinkConfA2 = LinkConfA1#{<<"enable">> => false},
|
||||||
?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA2]])),
|
?assertMatch({ok, _}, update(NodeA1, [LinkConfA2], Config)),
|
||||||
%% must be already blocked by the receiving cluster even if externak routing state is not
|
%% must be already blocked by the receiving cluster even if external routing state is not
|
||||||
%% updated yet
|
%% updated yet
|
||||||
{ok, _} = emqtt:publish(ClientB, <<"t/new/1">>, <<"not-expected-hello-from-b-1">>, qos1),
|
{ok, _} = emqtt:publish(ClientB, <<"t/new/1">>, <<"not-expected-hello-from-b-1">>, qos1),
|
||||||
|
|
||||||
LinkConfB1 = LinkConfB#{<<"enable">> => false},
|
LinkConfB1 = LinkConfB#{<<"enable">> => false},
|
||||||
?assertMatch({ok, _}, erpc:call(NodeB1, emqx_cluster_link_config, update, [[LinkConfB1]])),
|
?assertMatch({ok, _}, update(NodeB1, [LinkConfB1], Config)),
|
||||||
{ok, _} = emqtt:publish(ClientA, <<"t/test-topic">>, <<"not-expected-hello-from-a">>, qos1),
|
{ok, _} = emqtt:publish(ClientA, <<"t/test-topic">>, <<"not-expected-hello-from-a">>, qos1),
|
||||||
|
|
||||||
?assertNotReceive({publish, _Message = #{}}, 3000),
|
?assertNotReceive({publish, _Message = #{}}, 3000),
|
||||||
|
|
||||||
%% delete links
|
%% delete links
|
||||||
?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[]])),
|
?assertMatch({ok, _}, update(NodeA1, [], Config)),
|
||||||
?assertMatch({ok, _}, erpc:call(NodeB1, emqx_cluster_link_config, update, [[]])),
|
?assertMatch({ok, _}, update(NodeB1, [], Config)),
|
||||||
|
validate_update_cli_failed(NodeA1, Config),
|
||||||
|
|
||||||
ok = emqtt:stop(ClientA),
|
ok = emqtt:stop(ClientA),
|
||||||
ok = emqtt:stop(ClientB).
|
ok = emqtt:stop(ClientB).
|
||||||
|
@ -645,3 +669,51 @@ fmt(Fmt, Args) ->
|
||||||
|
|
||||||
mk_nodename(BaseName, Idx) ->
|
mk_nodename(BaseName, Idx) ->
|
||||||
binary_to_atom(fmt("emqx_clink_~s_~b", [BaseName, Idx])).
|
binary_to_atom(fmt("emqx_clink_~s_~b", [BaseName, Idx])).
|
||||||
|
|
||||||
|
validate_update_cli_failed(Node, Config) ->
|
||||||
|
case ?config(update_from, Config) of
|
||||||
|
api ->
|
||||||
|
ok;
|
||||||
|
cli ->
|
||||||
|
ConfBin = hocon_pp:do(
|
||||||
|
#{
|
||||||
|
<<"cluster">> => #{
|
||||||
|
<<"links">> => [],
|
||||||
|
<<"autoclean">> => <<"12h">>
|
||||||
|
}
|
||||||
|
},
|
||||||
|
#{}
|
||||||
|
),
|
||||||
|
ConfFile = prepare_conf_file(?FUNCTION_NAME, ConfBin, Config),
|
||||||
|
?assertMatch(
|
||||||
|
{error, <<"Cannot update read-only key 'cluster.autoclean'.">>},
|
||||||
|
erpc:call(Node, emqx_conf_cli, conf, [["load", ConfFile]])
|
||||||
|
)
|
||||||
|
end.
|
||||||
|
|
||||||
|
update(Node, Links, Config) ->
|
||||||
|
case ?config(update_from, Config) of
|
||||||
|
api -> update_links_from_api(Node, Links, Config);
|
||||||
|
cli -> update_links_from_cli(Node, Links, Config)
|
||||||
|
end.
|
||||||
|
|
||||||
|
update_links_from_api(Node, Links, _Config) ->
|
||||||
|
erpc:call(Node, emqx_cluster_link_config, update, [Links]).
|
||||||
|
|
||||||
|
update_links_from_cli(Node, Links, Config) ->
|
||||||
|
ConfBin = hocon_pp:do(#{<<"cluster">> => #{<<"links">> => Links}}, #{}),
|
||||||
|
ConfFile = prepare_conf_file(?FUNCTION_NAME, ConfBin, Config),
|
||||||
|
case erpc:call(Node, emqx_conf_cli, conf, [["load", ConfFile]]) of
|
||||||
|
ok -> {ok, Links};
|
||||||
|
Error -> Error
|
||||||
|
end.
|
||||||
|
|
||||||
|
prepare_conf_file(Name, Content, CTConfig) ->
|
||||||
|
Filename = tc_conf_file(Name, CTConfig),
|
||||||
|
filelib:ensure_dir(Filename),
|
||||||
|
ok = file:write_file(Filename, Content),
|
||||||
|
Filename.
|
||||||
|
|
||||||
|
tc_conf_file(TC, Config) ->
|
||||||
|
DataDir = ?config(data_dir, Config),
|
||||||
|
filename:join([DataDir, TC, 'emqx.conf']).
|
||||||
|
|
|
@ -46,6 +46,4 @@
|
||||||
)
|
)
|
||||||
).
|
).
|
||||||
|
|
||||||
-define(READONLY_KEYS, [cluster, rpc, node]).
|
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
|
@ -15,6 +15,8 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-module(emqx_conf_cli).
|
-module(emqx_conf_cli).
|
||||||
|
-feature(maybe_expr, enable).
|
||||||
|
|
||||||
-include("emqx_conf.hrl").
|
-include("emqx_conf.hrl").
|
||||||
-include_lib("emqx_auth/include/emqx_authn_chains.hrl").
|
-include_lib("emqx_auth/include/emqx_authn_chains.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
@ -40,8 +42,12 @@
|
||||||
-define(UPDATE_READONLY_KEYS_PROHIBITED, <<"Cannot update read-only key '~s'.">>).
|
-define(UPDATE_READONLY_KEYS_PROHIBITED, <<"Cannot update read-only key '~s'.">>).
|
||||||
-define(SCHEMA_VALIDATION_CONF_ROOT_BIN, <<"schema_validation">>).
|
-define(SCHEMA_VALIDATION_CONF_ROOT_BIN, <<"schema_validation">>).
|
||||||
-define(MESSAGE_TRANSFORMATION_CONF_ROOT_BIN, <<"message_transformation">>).
|
-define(MESSAGE_TRANSFORMATION_CONF_ROOT_BIN, <<"message_transformation">>).
|
||||||
|
-define(LOCAL_OPTIONS, #{rawconf_with_defaults => true, persistent => false}).
|
||||||
-define(TIMEOUT, 30000).
|
-define(TIMEOUT, 30000).
|
||||||
|
|
||||||
|
%% All 'cluster.*' keys, except for 'cluster.link', should also be treated as read-only.
|
||||||
|
-define(READONLY_ROOT_KEYS, [rpc, node]).
|
||||||
|
|
||||||
-dialyzer({no_match, [load/0]}).
|
-dialyzer({no_match, [load/0]}).
|
||||||
|
|
||||||
load() ->
|
load() ->
|
||||||
|
@ -312,8 +318,14 @@ get_config(Key) ->
|
||||||
load_config(Path, Opts) when is_list(Path) ->
|
load_config(Path, Opts) when is_list(Path) ->
|
||||||
case hocon:files([Path]) of
|
case hocon:files([Path]) of
|
||||||
{ok, RawConf} when RawConf =:= #{} ->
|
{ok, RawConf} when RawConf =:= #{} ->
|
||||||
emqx_ctl:warning("load ~ts is empty~n", [Path]),
|
case filelib:is_regular(Path) of
|
||||||
{error, empty_hocon_file};
|
true ->
|
||||||
|
emqx_ctl:warning("load ~ts is empty~n", [Path]),
|
||||||
|
{error, #{cause => empty_hocon_file, path => Path}};
|
||||||
|
false ->
|
||||||
|
emqx_ctl:warning("~ts file is not found~n", [Path]),
|
||||||
|
{error, #{cause => not_a_file, path => Path}}
|
||||||
|
end;
|
||||||
{ok, RawConf} ->
|
{ok, RawConf} ->
|
||||||
load_config_from_raw(RawConf, Opts);
|
load_config_from_raw(RawConf, Opts);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
@ -339,25 +351,21 @@ load_config_from_raw(RawConf0, Opts) ->
|
||||||
RawConf1 = emqx_config:upgrade_raw_conf(SchemaMod, RawConf0),
|
RawConf1 = emqx_config:upgrade_raw_conf(SchemaMod, RawConf0),
|
||||||
case check_config(RawConf1, Opts) of
|
case check_config(RawConf1, Opts) of
|
||||||
{ok, RawConf} ->
|
{ok, RawConf} ->
|
||||||
%% It has been ensured that the connector is always the first configuration to be updated.
|
case update_cluster_links(cluster, RawConf, Opts) of
|
||||||
%% However, when deleting the connector, we need to clean up the dependent actions/sources first;
|
ok ->
|
||||||
%% otherwise, the deletion will fail.
|
%% It has been ensured that the connector is always the first configuration to be updated.
|
||||||
%% notice: we can't create a action/sources before connector.
|
%% However, when deleting the connector, we need to clean up the dependent actions/sources first;
|
||||||
uninstall(<<"actions">>, RawConf, Opts),
|
%% otherwise, the deletion will fail.
|
||||||
uninstall(<<"sources">>, RawConf, Opts),
|
%% notice: we can't create a action/sources before connector.
|
||||||
Error =
|
uninstall(<<"actions">>, RawConf, Opts),
|
||||||
lists:filtermap(
|
uninstall(<<"sources">>, RawConf, Opts),
|
||||||
fun({K, V}) ->
|
Error = update_config_cluster(Opts, RawConf),
|
||||||
case update_config_cluster(K, V, Opts) of
|
case iolist_to_binary(Error) of
|
||||||
ok -> false;
|
<<"">> -> ok;
|
||||||
{error, Msg} -> {true, Msg}
|
ErrorBin -> {error, ErrorBin}
|
||||||
end
|
end;
|
||||||
end,
|
{error, Reason} ->
|
||||||
to_sorted_list(RawConf)
|
{error, Reason}
|
||||||
),
|
|
||||||
case iolist_to_binary(Error) of
|
|
||||||
<<"">> -> ok;
|
|
||||||
ErrorBin -> {error, ErrorBin}
|
|
||||||
end;
|
end;
|
||||||
{error, ?UPDATE_READONLY_KEYS_PROHIBITED, ReadOnlyKeyStr} ->
|
{error, ?UPDATE_READONLY_KEYS_PROHIBITED, ReadOnlyKeyStr} ->
|
||||||
Reason = iolist_to_binary(
|
Reason = iolist_to_binary(
|
||||||
|
@ -384,6 +392,26 @@ load_config_from_raw(RawConf0, Opts) ->
|
||||||
{error, Errors}
|
{error, Errors}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
update_config_cluster(Opts, RawConf) ->
|
||||||
|
lists:filtermap(
|
||||||
|
fun({K, V}) ->
|
||||||
|
case update_config_cluster(K, V, Opts) of
|
||||||
|
ok -> false;
|
||||||
|
{error, Msg} -> {true, Msg}
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
to_sorted_list(RawConf)
|
||||||
|
).
|
||||||
|
|
||||||
|
update_cluster_links(cluster, #{<<"cluster">> := #{<<"links">> := Links}}, Opts) ->
|
||||||
|
Res = emqx_conf:update([<<"cluster">>, <<"links">>], Links, ?OPTIONS),
|
||||||
|
check_res(<<"cluster.links">>, Res, Links, Opts);
|
||||||
|
update_cluster_links(local, #{<<"cluster">> := #{<<"links">> := Links}}, Opts) ->
|
||||||
|
Res = emqx:update_config([<<"cluster">>, <<"links">>], Links, ?LOCAL_OPTIONS),
|
||||||
|
check_res(node(), <<"cluster.links">>, Res, Links, Opts);
|
||||||
|
update_cluster_links(_, _, _) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
uninstall(ActionOrSource, Conf, #{mode := replace}) ->
|
uninstall(ActionOrSource, Conf, #{mode := replace}) ->
|
||||||
case maps:find(ActionOrSource, Conf) of
|
case maps:find(ActionOrSource, Conf) of
|
||||||
{ok, New} ->
|
{ok, New} ->
|
||||||
|
@ -443,7 +471,6 @@ update_config_cluster(Key, NewConf, #{mode := merge} = Opts) ->
|
||||||
update_config_cluster(Key, Value, #{mode := replace} = Opts) ->
|
update_config_cluster(Key, Value, #{mode := replace} = Opts) ->
|
||||||
check_res(Key, emqx_conf:update([Key], Value, ?OPTIONS), Value, Opts).
|
check_res(Key, emqx_conf:update([Key], Value, ?OPTIONS), Value, Opts).
|
||||||
|
|
||||||
-define(LOCAL_OPTIONS, #{rawconf_with_defaults => true, persistent => false}).
|
|
||||||
update_config_local(
|
update_config_local(
|
||||||
?EMQX_AUTHORIZATION_CONFIG_ROOT_NAME_BINARY = Key,
|
?EMQX_AUTHORIZATION_CONFIG_ROOT_NAME_BINARY = Key,
|
||||||
Conf,
|
Conf,
|
||||||
|
@ -503,21 +530,20 @@ suggest_msg(_, _) ->
|
||||||
<<"">>.
|
<<"">>.
|
||||||
|
|
||||||
check_config(Conf0, Opts) ->
|
check_config(Conf0, Opts) ->
|
||||||
case check_keys_is_not_readonly(Conf0, Opts) of
|
maybe
|
||||||
{ok, Conf1} ->
|
{ok, Conf1} ?= check_keys_is_not_readonly(Conf0, Opts),
|
||||||
Conf = emqx_config:fill_defaults(Conf1),
|
{ok, Conf2} ?= check_cluster_keys(Conf1, Opts),
|
||||||
case check_config_schema(Conf) of
|
Conf3 = emqx_config:fill_defaults(Conf2),
|
||||||
ok -> {ok, Conf};
|
ok ?= check_config_schema(Conf3),
|
||||||
{error, Reason} -> {error, Reason}
|
{ok, Conf3}
|
||||||
end;
|
else
|
||||||
Error ->
|
Error -> Error
|
||||||
Error
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
check_keys_is_not_readonly(Conf, Opts) ->
|
check_keys_is_not_readonly(Conf, Opts) ->
|
||||||
IgnoreReadonly = maps:get(ignore_readonly, Opts, false),
|
IgnoreReadonly = maps:get(ignore_readonly, Opts, false),
|
||||||
Keys = maps:keys(Conf),
|
Keys = maps:keys(Conf),
|
||||||
ReadOnlyKeys = [atom_to_binary(K) || K <- ?READONLY_KEYS],
|
ReadOnlyKeys = [atom_to_binary(K) || K <- ?READONLY_ROOT_KEYS],
|
||||||
case lists:filter(fun(K) -> lists:member(K, Keys) end, ReadOnlyKeys) of
|
case lists:filter(fun(K) -> lists:member(K, Keys) end, ReadOnlyKeys) of
|
||||||
[] ->
|
[] ->
|
||||||
{ok, Conf};
|
{ok, Conf};
|
||||||
|
@ -529,6 +555,22 @@ check_keys_is_not_readonly(Conf, Opts) ->
|
||||||
{error, ?UPDATE_READONLY_KEYS_PROHIBITED, BadKeysStr}
|
{error, ?UPDATE_READONLY_KEYS_PROHIBITED, BadKeysStr}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
check_cluster_keys(Conf = #{<<"cluster">> := Cluster}, Opts) ->
|
||||||
|
IgnoreReadonly = maps:get(ignore_readonly, Opts, false),
|
||||||
|
case maps:keys(Cluster) -- [<<"links">>] of
|
||||||
|
[] ->
|
||||||
|
{ok, Conf};
|
||||||
|
Keys when IgnoreReadonly ->
|
||||||
|
?SLOG(info, #{msg => "readonly_root_keys_ignored", keys => Keys}),
|
||||||
|
{ok, Conf#{<<"cluster">> => maps:with([<<"links">>], Cluster)}};
|
||||||
|
Keys ->
|
||||||
|
BadKeys = [<<"cluster.", K/binary>> || K <- Keys],
|
||||||
|
BadKeysStr = lists:join(<<",">>, BadKeys),
|
||||||
|
{error, ?UPDATE_READONLY_KEYS_PROHIBITED, BadKeysStr}
|
||||||
|
end;
|
||||||
|
check_cluster_keys(Conf, _Opts) ->
|
||||||
|
{ok, Conf}.
|
||||||
|
|
||||||
check_config_schema(Conf) ->
|
check_config_schema(Conf) ->
|
||||||
SchemaMod = emqx_conf:schema_module(),
|
SchemaMod = emqx_conf:schema_module(),
|
||||||
Fold = fun({Key, Value}, Acc) ->
|
Fold = fun({Key, Value}, Acc) ->
|
||||||
|
@ -587,7 +629,7 @@ filter_readonly_config(Raw) ->
|
||||||
try
|
try
|
||||||
RawDefault = fill_defaults(Raw),
|
RawDefault = fill_defaults(Raw),
|
||||||
_ = emqx_config:check_config(SchemaMod, RawDefault),
|
_ = emqx_config:check_config(SchemaMod, RawDefault),
|
||||||
{ok, maps:without([atom_to_binary(K) || K <- ?READONLY_KEYS], Raw)}
|
{ok, maps:without([atom_to_binary(K) || K <- ?READONLY_ROOT_KEYS], Raw)}
|
||||||
catch
|
catch
|
||||||
throw:Error ->
|
throw:Error ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{
|
||||||
|
@ -598,23 +640,28 @@ filter_readonly_config(Raw) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
reload_config(AllConf, Opts) ->
|
reload_config(AllConf, Opts) ->
|
||||||
uninstall(<<"actions">>, AllConf, Opts),
|
case update_cluster_links(local, AllConf, Opts) of
|
||||||
uninstall(<<"sources">>, AllConf, Opts),
|
ok ->
|
||||||
Fold = fun({Key, Conf}, Acc) ->
|
uninstall(<<"actions">>, AllConf, Opts),
|
||||||
case update_config_local(Key, Conf, Opts) of
|
uninstall(<<"sources">>, AllConf, Opts),
|
||||||
ok ->
|
Fold = fun({Key, Conf}, Acc) ->
|
||||||
Acc;
|
case update_config_local(Key, Conf, Opts) of
|
||||||
Error ->
|
ok ->
|
||||||
?SLOG(error, #{
|
Acc;
|
||||||
msg => "failed_to_reload_etc_config",
|
Error ->
|
||||||
key => Key,
|
?SLOG(error, #{
|
||||||
value => Conf,
|
msg => "failed_to_reload_etc_config",
|
||||||
error => Error
|
key => Key,
|
||||||
}),
|
value => Conf,
|
||||||
[{Key, Error} | Acc]
|
error => Error
|
||||||
end
|
}),
|
||||||
end,
|
[{Key, Error} | Acc]
|
||||||
sorted_fold(Fold, AllConf).
|
end
|
||||||
|
end,
|
||||||
|
sorted_fold(Fold, AllConf);
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
sorted_fold(Func, Conf) ->
|
sorted_fold(Func, Conf) ->
|
||||||
case lists:foldl(Func, [], to_sorted_list(Conf)) of
|
case lists:foldl(Func, [], to_sorted_list(Conf)) of
|
||||||
|
@ -623,10 +670,11 @@ sorted_fold(Func, Conf) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
to_sorted_list(Conf0) ->
|
to_sorted_list(Conf0) ->
|
||||||
|
Conf1 = maps:remove(<<"cluster">>, Conf0),
|
||||||
%% connectors > actions/bridges > rule_engine
|
%% connectors > actions/bridges > rule_engine
|
||||||
Keys = [<<"connectors">>, <<"actions">>, <<"bridges">>, <<"rule_engine">>],
|
Keys = [<<"connectors">>, <<"actions">>, <<"bridges">>, <<"rule_engine">>],
|
||||||
{HighPriorities, Conf1} = split_high_priority_conf(Keys, Conf0, []),
|
{HighPriorities, Conf2} = split_high_priority_conf(Keys, Conf1, []),
|
||||||
HighPriorities ++ lists:keysort(1, maps:to_list(Conf1)).
|
HighPriorities ++ lists:keysort(1, maps:to_list(Conf2)).
|
||||||
|
|
||||||
split_high_priority_conf([], Conf0, Acc) ->
|
split_high_priority_conf([], Conf0, Acc) ->
|
||||||
{lists:reverse(Acc), Conf0};
|
{lists:reverse(Acc), Conf0};
|
||||||
|
@ -753,7 +801,7 @@ find_running_confs() ->
|
||||||
lists:map(
|
lists:map(
|
||||||
fun(Node) ->
|
fun(Node) ->
|
||||||
Conf = emqx_conf_proto_v4:get_config(Node, []),
|
Conf = emqx_conf_proto_v4:get_config(Node, []),
|
||||||
{Node, maps:without(?READONLY_KEYS, Conf)}
|
{Node, maps:without(?READONLY_ROOT_KEYS, Conf)}
|
||||||
end,
|
end,
|
||||||
mria:running_nodes()
|
mria:running_nodes()
|
||||||
).
|
).
|
||||||
|
|
|
@ -21,9 +21,11 @@
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
-include("emqx_conf.hrl").
|
|
||||||
-import(emqx_config_SUITE, [prepare_conf_file/3]).
|
-import(emqx_config_SUITE, [prepare_conf_file/3]).
|
||||||
|
|
||||||
|
-define(READONLY_ROOT_KEYS, [rpc, node]).
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
emqx_common_test_helpers:all(?MODULE).
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
|
@ -81,7 +83,11 @@ t_load_config(Config) ->
|
||||||
Conf#{<<"sources">> => [emqx_authz_schema:default_authz()]},
|
Conf#{<<"sources">> => [emqx_authz_schema:default_authz()]},
|
||||||
emqx_conf:get_raw([Authz])
|
emqx_conf:get_raw([Authz])
|
||||||
),
|
),
|
||||||
?assertEqual({error, empty_hocon_file}, emqx_conf_cli:conf(["load", "non-exist-file"])),
|
?assertMatch({error, #{cause := not_a_file}}, emqx_conf_cli:conf(["load", "non-exist-file"])),
|
||||||
|
EmptyFile = "empty_file.conf",
|
||||||
|
ok = file:write_file(EmptyFile, <<>>),
|
||||||
|
?assertMatch({error, #{cause := empty_hocon_file}}, emqx_conf_cli:conf(["load", EmptyFile])),
|
||||||
|
ok = file:delete(EmptyFile),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_conflict_mix_conf(Config) ->
|
t_conflict_mix_conf(Config) ->
|
||||||
|
@ -203,7 +209,7 @@ t_load_readonly(Config) ->
|
||||||
%% Don't update readonly key
|
%% Don't update readonly key
|
||||||
?assertEqual(Conf, emqx_conf:get_raw([Key]))
|
?assertEqual(Conf, emqx_conf:get_raw([Key]))
|
||||||
end,
|
end,
|
||||||
?READONLY_KEYS
|
?READONLY_ROOT_KEYS
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
|
|
@ -309,7 +309,7 @@ t_configs_key(_Config) ->
|
||||||
},
|
},
|
||||||
ReadOnlyBin = iolist_to_binary(hocon_pp:do(ReadOnlyConf, #{})),
|
ReadOnlyBin = iolist_to_binary(hocon_pp:do(ReadOnlyConf, #{})),
|
||||||
{error, ReadOnlyError} = update_configs_with_binary(ReadOnlyBin),
|
{error, ReadOnlyError} = update_configs_with_binary(ReadOnlyBin),
|
||||||
?assertEqual(<<"{\"errors\":\"Cannot update read-only key 'cluster'.\"}">>, ReadOnlyError),
|
?assertMatch(<<"{\"errors\":\"Cannot update read-only key 'cluster", _/binary>>, ReadOnlyError),
|
||||||
?assertMatch({ok, <<>>}, update_configs_with_binary(ReadOnlyBin, _IgnoreReadonly = true)),
|
?assertMatch({ok, <<>>}, update_configs_with_binary(ReadOnlyBin, _IgnoreReadonly = true)),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue