feat(schema validation): implement support for `ctl conf load`
This commit is contained in:
parent
fd949240c0
commit
34a29e6363
|
@ -36,6 +36,7 @@
|
||||||
-define(CONF, conf).
|
-define(CONF, conf).
|
||||||
-define(AUDIT_MOD, audit).
|
-define(AUDIT_MOD, audit).
|
||||||
-define(UPDATE_READONLY_KEYS_PROHIBITED, <<"Cannot update read-only key '~s'.">>).
|
-define(UPDATE_READONLY_KEYS_PROHIBITED, <<"Cannot update read-only key '~s'.">>).
|
||||||
|
-define(SCHEMA_VALIDATION_CONF_ROOT_BIN, <<"schema_validation">>).
|
||||||
|
|
||||||
-dialyzer({no_match, [load/0]}).
|
-dialyzer({no_match, [load/0]}).
|
||||||
|
|
||||||
|
@ -330,6 +331,10 @@ update_config_cluster(
|
||||||
#{mode := merge} = Opts
|
#{mode := merge} = Opts
|
||||||
) ->
|
) ->
|
||||||
check_res(Key, emqx_authn:merge_config(Conf), Conf, Opts);
|
check_res(Key, emqx_authn:merge_config(Conf), Conf, Opts);
|
||||||
|
update_config_cluster(?SCHEMA_VALIDATION_CONF_ROOT_BIN = Key, NewConf, #{mode := merge} = Opts) ->
|
||||||
|
check_res(Key, emqx_conf:update([Key], {merge, NewConf}, ?OPTIONS), NewConf, Opts);
|
||||||
|
update_config_cluster(?SCHEMA_VALIDATION_CONF_ROOT_BIN = Key, NewConf, #{mode := replace} = Opts) ->
|
||||||
|
check_res(Key, emqx_conf:update([Key], {replace, NewConf}, ?OPTIONS), NewConf, Opts);
|
||||||
update_config_cluster(Key, NewConf, #{mode := merge} = Opts) ->
|
update_config_cluster(Key, NewConf, #{mode := merge} = Opts) ->
|
||||||
Merged = merge_conf(Key, NewConf),
|
Merged = merge_conf(Key, NewConf),
|
||||||
check_res(Key, emqx_conf:update([Key], Merged, ?OPTIONS), NewConf, Opts);
|
check_res(Key, emqx_conf:update([Key], Merged, ?OPTIONS), NewConf, Opts);
|
||||||
|
|
|
@ -65,12 +65,14 @@
|
||||||
|
|
||||||
-spec add_handler() -> ok.
|
-spec add_handler() -> ok.
|
||||||
add_handler() ->
|
add_handler() ->
|
||||||
|
ok = emqx_config_handler:add_handler([?CONF_ROOT], ?MODULE),
|
||||||
ok = emqx_config_handler:add_handler(?VALIDATIONS_CONF_PATH, ?MODULE),
|
ok = emqx_config_handler:add_handler(?VALIDATIONS_CONF_PATH, ?MODULE),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
-spec remove_handler() -> ok.
|
-spec remove_handler() -> ok.
|
||||||
remove_handler() ->
|
remove_handler() ->
|
||||||
ok = emqx_config_handler:remove_handler(?VALIDATIONS_CONF_PATH),
|
ok = emqx_config_handler:remove_handler(?VALIDATIONS_CONF_PATH),
|
||||||
|
ok = emqx_config_handler:remove_handler([?CONF_ROOT]),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
load() ->
|
load() ->
|
||||||
|
@ -185,7 +187,12 @@ pre_config_update(?VALIDATIONS_CONF_PATH, {update, Validation}, OldValidations)
|
||||||
pre_config_update(?VALIDATIONS_CONF_PATH, {delete, Validation}, OldValidations) ->
|
pre_config_update(?VALIDATIONS_CONF_PATH, {delete, Validation}, OldValidations) ->
|
||||||
delete(OldValidations, Validation);
|
delete(OldValidations, Validation);
|
||||||
pre_config_update(?VALIDATIONS_CONF_PATH, {reorder, Order}, OldValidations) ->
|
pre_config_update(?VALIDATIONS_CONF_PATH, {reorder, Order}, OldValidations) ->
|
||||||
reorder(OldValidations, Order).
|
reorder(OldValidations, Order);
|
||||||
|
pre_config_update([?CONF_ROOT], {merge, NewConfig}, OldConfig) ->
|
||||||
|
#{resulting_config := Config} = prepare_config_merge(NewConfig, OldConfig),
|
||||||
|
{ok, Config};
|
||||||
|
pre_config_update([?CONF_ROOT], {replace, NewConfig}, _OldConfig) ->
|
||||||
|
{ok, NewConfig}.
|
||||||
|
|
||||||
post_config_update(?VALIDATIONS_CONF_PATH, {append, #{<<"name">> := Name}}, New, _Old, _AppEnvs) ->
|
post_config_update(?VALIDATIONS_CONF_PATH, {append, #{<<"name">> := Name}}, New, _Old, _AppEnvs) ->
|
||||||
{Pos, Validation} = fetch_with_index(New, Name),
|
{Pos, Validation} = fetch_with_index(New, Name),
|
||||||
|
@ -202,57 +209,77 @@ post_config_update(?VALIDATIONS_CONF_PATH, {delete, Name}, _New, Old, _AppEnvs)
|
||||||
ok;
|
ok;
|
||||||
post_config_update(?VALIDATIONS_CONF_PATH, {reorder, _Order}, New, _Old, _AppEnvs) ->
|
post_config_update(?VALIDATIONS_CONF_PATH, {reorder, _Order}, New, _Old, _AppEnvs) ->
|
||||||
ok = emqx_schema_validation_registry:reindex_positions(New),
|
ok = emqx_schema_validation_registry:reindex_positions(New),
|
||||||
ok.
|
ok;
|
||||||
|
post_config_update([?CONF_ROOT], {merge, _}, ResultingConfig, Old, _AppEnvs) ->
|
||||||
|
#{validations := ResultingValidations} = ResultingConfig,
|
||||||
|
#{validations := OldValidations} = Old,
|
||||||
|
#{added := NewValidations0} =
|
||||||
|
emqx_utils:diff_lists(
|
||||||
|
ResultingValidations,
|
||||||
|
OldValidations,
|
||||||
|
fun(#{name := N}) -> N end
|
||||||
|
),
|
||||||
|
NewValidations =
|
||||||
|
lists:map(
|
||||||
|
fun(#{name := Name}) ->
|
||||||
|
{Pos, Validation} = fetch_with_index(ResultingValidations, Name),
|
||||||
|
ok = emqx_schema_validation_registry:insert(Pos, Validation),
|
||||||
|
#{name => Name, pos => Pos}
|
||||||
|
end,
|
||||||
|
NewValidations0
|
||||||
|
),
|
||||||
|
{ok, #{new_validations => NewValidations}};
|
||||||
|
post_config_update([?CONF_ROOT], {replace, Input}, ResultingConfig, Old, _AppEnvs) ->
|
||||||
|
#{
|
||||||
|
new_validations := NewValidations,
|
||||||
|
changed_validations := ChangedValidations0,
|
||||||
|
deleted_validations := DeletedValidations
|
||||||
|
} = prepare_config_replace(Input, Old),
|
||||||
|
#{validations := ResultingValidations} = ResultingConfig,
|
||||||
|
#{validations := OldValidations} = Old,
|
||||||
|
lists:foreach(
|
||||||
|
fun(Name) ->
|
||||||
|
{_Pos, Validation} = fetch_with_index(OldValidations, Name),
|
||||||
|
ok = emqx_schema_validation_registry:delete(Validation)
|
||||||
|
end,
|
||||||
|
DeletedValidations
|
||||||
|
),
|
||||||
|
lists:foreach(
|
||||||
|
fun(Name) ->
|
||||||
|
{Pos, Validation} = fetch_with_index(ResultingValidations, Name),
|
||||||
|
ok = emqx_schema_validation_registry:insert(Pos, Validation)
|
||||||
|
end,
|
||||||
|
NewValidations
|
||||||
|
),
|
||||||
|
ChangedValidations =
|
||||||
|
lists:map(
|
||||||
|
fun(Name) ->
|
||||||
|
{_Pos, OldValidation} = fetch_with_index(OldValidations, Name),
|
||||||
|
{Pos, NewValidation} = fetch_with_index(ResultingValidations, Name),
|
||||||
|
ok = emqx_schema_validation_registry:update(OldValidation, Pos, NewValidation),
|
||||||
|
#{name => Name, pos => Pos}
|
||||||
|
end,
|
||||||
|
ChangedValidations0
|
||||||
|
),
|
||||||
|
ok = emqx_schema_validation_registry:reindex_positions(ResultingValidations),
|
||||||
|
{ok, #{changed_validations => ChangedValidations}}.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% `emqx_config_backup' API
|
%% `emqx_config_backup' API
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
import_config(#{?CONF_ROOT_BIN := RawConf0}) ->
|
import_config(#{?CONF_ROOT_BIN := RawConf0}) ->
|
||||||
OldRawValidations = emqx_config:get_raw([?CONF_ROOT_BIN, <<"validations">>], []),
|
Result = emqx_conf:update(
|
||||||
{ImportedRawValidations, RawConf1} =
|
[?CONF_ROOT],
|
||||||
case maps:take(<<"validations">>, RawConf0) of
|
{merge, RawConf0},
|
||||||
error ->
|
#{override_to => cluster, rawconf_with_defaults => true}
|
||||||
{[], RawConf0};
|
|
||||||
{V, R} ->
|
|
||||||
{V, R}
|
|
||||||
end,
|
|
||||||
%% If there's a matching validation, we don't overwrite it. We don't remove any
|
|
||||||
%% validations, either.
|
|
||||||
#{added := NewRawValidations} = emqx_utils:diff_lists(
|
|
||||||
ImportedRawValidations,
|
|
||||||
OldRawValidations,
|
|
||||||
fun(#{<<"name">> := N}) -> N end
|
|
||||||
),
|
|
||||||
OtherConfs = maps:to_list(RawConf1),
|
|
||||||
Result = emqx_utils:foldl_while(
|
|
||||||
fun
|
|
||||||
({validation, RawValidation}, Acc) ->
|
|
||||||
case insert(RawValidation) of
|
|
||||||
{ok, _} ->
|
|
||||||
#{<<"name">> := N} = RawValidation,
|
|
||||||
ChangedPath = [?CONF_ROOT, validations, '?', N],
|
|
||||||
{cont, [ChangedPath | Acc]};
|
|
||||||
{error, _} = Error ->
|
|
||||||
{halt, Error}
|
|
||||||
end;
|
|
||||||
({Key, RawConf}, Acc) ->
|
|
||||||
case emqx_conf:update([?CONF_ROOT_BIN, Key], RawConf, #{override_to => cluster}) of
|
|
||||||
{ok, _} ->
|
|
||||||
ChangedPath = [?CONF_ROOT, Key],
|
|
||||||
{conf, [ChangedPath | Acc]};
|
|
||||||
{error, _} = Error ->
|
|
||||||
{halt, Error}
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
[],
|
|
||||||
OtherConfs ++
|
|
||||||
[{validation, NewValidation} || NewValidation <- NewRawValidations]
|
|
||||||
),
|
),
|
||||||
case Result of
|
case Result of
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{error, #{root_key => ?CONF_ROOT, reason => Reason}};
|
{error, #{root_key => ?CONF_ROOT, reason => Reason}};
|
||||||
ChangedPaths ->
|
{ok, _} ->
|
||||||
|
Keys0 = maps:keys(RawConf0),
|
||||||
|
ChangedPaths = Keys0 -- [<<"validations">>],
|
||||||
{ok, #{root_key => ?CONF_ROOT, changed => ChangedPaths}}
|
{ok, #{root_key => ?CONF_ROOT, changed => ChangedPaths}}
|
||||||
end;
|
end;
|
||||||
import_config(_RawConf) ->
|
import_config(_RawConf) ->
|
||||||
|
@ -530,3 +557,55 @@ run_schema_validation_failed_hook(Message, Validation) ->
|
||||||
#{name := Name} = Validation,
|
#{name := Name} = Validation,
|
||||||
ValidationContext = #{name => Name},
|
ValidationContext = #{name => Name},
|
||||||
emqx_hooks:run('schema.validation_failed', [Message, ValidationContext]).
|
emqx_hooks:run('schema.validation_failed', [Message, ValidationContext]).
|
||||||
|
|
||||||
|
%% "Merging" in the context of the validation array means:
|
||||||
|
%% * Existing validations (identified by `name') are left untouched.
|
||||||
|
%% * No validations are removed.
|
||||||
|
%% * New validations are appended to the existing list.
|
||||||
|
%% * Existing validations are not reordered.
|
||||||
|
prepare_config_merge(NewConfig0, OldConfig) ->
|
||||||
|
{ImportedRawValidations, NewConfigNoValidations} =
|
||||||
|
case maps:take(<<"validations">>, NewConfig0) of
|
||||||
|
error ->
|
||||||
|
{[], NewConfig0};
|
||||||
|
{V, R} ->
|
||||||
|
{V, R}
|
||||||
|
end,
|
||||||
|
OldRawValidations = maps:get(<<"validations">>, OldConfig, []),
|
||||||
|
#{added := NewRawValidations} = emqx_utils:diff_lists(
|
||||||
|
ImportedRawValidations,
|
||||||
|
OldRawValidations,
|
||||||
|
fun(#{<<"name">> := N}) -> N end
|
||||||
|
),
|
||||||
|
Config0 = emqx_utils_maps:deep_merge(OldConfig, NewConfigNoValidations),
|
||||||
|
Config = maps:update_with(
|
||||||
|
<<"validations">>,
|
||||||
|
fun(OldVs) -> OldVs ++ NewRawValidations end,
|
||||||
|
NewRawValidations,
|
||||||
|
Config0
|
||||||
|
),
|
||||||
|
#{
|
||||||
|
new_validations => NewRawValidations,
|
||||||
|
resulting_config => Config
|
||||||
|
}.
|
||||||
|
|
||||||
|
prepare_config_replace(NewConfig, OldConfig) ->
|
||||||
|
ImportedRawValidations = maps:get(<<"validations">>, NewConfig, []),
|
||||||
|
OldValidations = maps:get(validations, OldConfig, []),
|
||||||
|
%% Since, at this point, we have an input raw config but a parsed old config, we
|
||||||
|
%% project both to the to have only their names, and consider common names as changed.
|
||||||
|
#{
|
||||||
|
added := NewValidations,
|
||||||
|
removed := DeletedValidations,
|
||||||
|
changed := ChangedValidations0,
|
||||||
|
identical := ChangedValidations1
|
||||||
|
} = emqx_utils:diff_lists(
|
||||||
|
lists:map(fun(#{<<"name">> := N}) -> N end, ImportedRawValidations),
|
||||||
|
lists:map(fun(#{name := N}) -> N end, OldValidations),
|
||||||
|
fun(N) -> N end
|
||||||
|
),
|
||||||
|
#{
|
||||||
|
new_validations => NewValidations,
|
||||||
|
changed_validations => ChangedValidations0 ++ ChangedValidations1,
|
||||||
|
deleted_validations => DeletedValidations
|
||||||
|
}.
|
||||||
|
|
|
@ -461,6 +461,12 @@ assert_monitor_metrics() ->
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
normalize_validations(RawValidations) ->
|
||||||
|
[
|
||||||
|
V#{<<"topics">> := [T]}
|
||||||
|
|| #{<<"topics">> := T} = V <- RawValidations
|
||||||
|
].
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Testcases
|
%% Testcases
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -1287,16 +1293,94 @@ t_import_config_backup(_Config) ->
|
||||||
|
|
||||||
{204, _} = import_backup(BackupName),
|
{204, _} = import_backup(BackupName),
|
||||||
|
|
||||||
ExpectedValidations = [
|
ExpectedValidations = normalize_validations([
|
||||||
V#{<<"topics">> := [T]}
|
Validation1A,
|
||||||
|| #{<<"topics">> := T} = V <- [
|
Validation2A,
|
||||||
Validation1A,
|
Validation3,
|
||||||
Validation2A,
|
Validation4
|
||||||
Validation3,
|
]),
|
||||||
Validation4
|
|
||||||
]
|
|
||||||
],
|
|
||||||
?assertMatch({200, ExpectedValidations}, list()),
|
?assertMatch({200, ExpectedValidations}, list()),
|
||||||
?assertIndexOrder([Name1, Name2, Name3, Name4], <<"t/a">>),
|
?assertIndexOrder([Name1, Name2, Name3, Name4], <<"t/a">>),
|
||||||
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
%% Tests that importing configurations from the CLI interface work.
|
||||||
|
t_load_config(_Config) ->
|
||||||
|
Name1 = <<"1">>,
|
||||||
|
Check1A = sql_check(<<"select 1 where true">>),
|
||||||
|
Validation1A = validation(Name1, [Check1A]),
|
||||||
|
{201, _} = insert(Validation1A),
|
||||||
|
|
||||||
|
Name2 = <<"2">>,
|
||||||
|
Check2A = sql_check(<<"select 2 where true">>),
|
||||||
|
Validation2A = validation(Name2, [Check2A]),
|
||||||
|
{201, _} = insert(Validation2A),
|
||||||
|
|
||||||
|
Name3 = <<"3">>,
|
||||||
|
Check3 = sql_check(<<"select 3 where true">>),
|
||||||
|
Validation3 = validation(Name3, [Check3]),
|
||||||
|
{201, _} = insert(Validation3),
|
||||||
|
|
||||||
|
%% Config to load
|
||||||
|
%% Will replace existing config
|
||||||
|
Check2B = sql_check(<<"select 2 where false">>),
|
||||||
|
Validation2B = validation(Name2, [Check2B]),
|
||||||
|
|
||||||
|
%% Will replace existing config
|
||||||
|
Check1B = sql_check(<<"select 1 where false">>),
|
||||||
|
Validation1B = validation(Name1, [Check1B]),
|
||||||
|
|
||||||
|
%% New validation; should be appended
|
||||||
|
Name4 = <<"4">>,
|
||||||
|
Check4 = sql_check(<<"select 4 where true">>),
|
||||||
|
Validation4 = validation(Name4, [Check4]),
|
||||||
|
|
||||||
|
ConfRootBin = <<"schema_validation">>,
|
||||||
|
ConfigToLoad1 = #{
|
||||||
|
ConfRootBin => #{
|
||||||
|
<<"validations">> => [Validation2B, Validation1B, Validation4]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
ConfigToLoadBin1 = iolist_to_binary(hocon_pp:do(ConfigToLoad1, #{})),
|
||||||
|
?assertMatch(ok, emqx_conf_cli:load_config(ConfigToLoadBin1, #{mode => merge})),
|
||||||
|
ExpectedValidations1 = normalize_validations([
|
||||||
|
Validation1A,
|
||||||
|
Validation2A,
|
||||||
|
Validation3,
|
||||||
|
Validation4
|
||||||
|
]),
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
ConfRootBin := #{
|
||||||
|
<<"validations">> := ExpectedValidations1
|
||||||
|
}
|
||||||
|
},
|
||||||
|
emqx_conf_cli:get_config(<<"schema_validation">>)
|
||||||
|
),
|
||||||
|
?assertIndexOrder([Name1, Name2, Name3, Name4], <<"t/a">>),
|
||||||
|
|
||||||
|
%% Replace
|
||||||
|
Check4B = sql_check(<<"select 4, true where true">>),
|
||||||
|
Validation4B = validation(Name4, [Check4B]),
|
||||||
|
|
||||||
|
Name5 = <<"5">>,
|
||||||
|
Check5 = sql_check(<<"select 5 where true">>),
|
||||||
|
Validation5 = validation(Name5, [Check5]),
|
||||||
|
|
||||||
|
ConfigToLoad2 = #{
|
||||||
|
ConfRootBin => #{<<"validations">> => [Validation4B, Validation3, Validation5]}
|
||||||
|
},
|
||||||
|
ConfigToLoadBin2 = iolist_to_binary(hocon_pp:do(ConfigToLoad2, #{})),
|
||||||
|
?assertMatch(ok, emqx_conf_cli:load_config(ConfigToLoadBin2, #{mode => replace})),
|
||||||
|
ExpectedValidations2 = normalize_validations([Validation4B, Validation3, Validation5]),
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
ConfRootBin := #{
|
||||||
|
<<"validations">> := ExpectedValidations2
|
||||||
|
}
|
||||||
|
},
|
||||||
|
emqx_conf_cli:get_config(<<"schema_validation">>)
|
||||||
|
),
|
||||||
|
?assertIndexOrder([Name4, Name3, Name5], <<"t/a">>),
|
||||||
|
|
||||||
|
ok.
|
||||||
|
|
|
@ -751,7 +751,6 @@ safe_filename(Filename) when is_list(Filename) ->
|
||||||
when
|
when
|
||||||
Func :: fun((T) -> any()),
|
Func :: fun((T) -> any()),
|
||||||
T :: any().
|
T :: any().
|
||||||
|
|
||||||
diff_lists(New, Old, KeyFunc) when is_list(New) andalso is_list(Old) ->
|
diff_lists(New, Old, KeyFunc) when is_list(New) andalso is_list(Old) ->
|
||||||
Removed =
|
Removed =
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
|
|
Loading…
Reference in New Issue