Merge pull request #13048 from thalesmg/sv-backup-r57-20240514

feat(schema validation): implement backup restore
This commit is contained in:
zhongwencool 2024-05-17 09:47:04 +08:00 committed by GitHub
commit 1ac7aa151e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 313 additions and 3 deletions

View File

@ -36,6 +36,7 @@
-define(CONF, conf).
-define(AUDIT_MOD, audit).
-define(UPDATE_READONLY_KEYS_PROHIBITED, <<"Cannot update read-only key '~s'.">>).
-define(SCHEMA_VALIDATION_CONF_ROOT_BIN, <<"schema_validation">>).
-dialyzer({no_match, [load/0]}).
@ -330,6 +331,10 @@ update_config_cluster(
#{mode := merge} = Opts
) ->
check_res(Key, emqx_authn:merge_config(Conf), Conf, Opts);
update_config_cluster(?SCHEMA_VALIDATION_CONF_ROOT_BIN = Key, NewConf, #{mode := merge} = Opts) ->
check_res(Key, emqx_conf:update([Key], {merge, NewConf}, ?OPTIONS), NewConf, Opts);
update_config_cluster(?SCHEMA_VALIDATION_CONF_ROOT_BIN = Key, NewConf, #{mode := replace} = Opts) ->
check_res(Key, emqx_conf:update([Key], {replace, NewConf}, ?OPTIONS), NewConf, Opts);
update_config_cluster(Key, NewConf, #{mode := merge} = Opts) ->
Merged = merge_conf(Key, NewConf),
check_res(Key, emqx_conf:update([Key], Merged, ?OPTIONS), NewConf, Opts);

View File

@ -35,6 +35,10 @@
%% `emqx_config_handler' API
-export([pre_config_update/3, post_config_update/5]).
%% `emqx_config_backup' API
-behaviour(emqx_config_backup).
-export([import_config/1]).
%% Internal exports
-export([parse_sql_check/1]).
@ -49,6 +53,7 @@
-define(TRACE_TAG, "SCHEMA_VALIDATION").
-define(CONF_ROOT, schema_validation).
-define(CONF_ROOT_BIN, <<"schema_validation">>).
-define(VALIDATIONS_CONF_PATH, [?CONF_ROOT, validations]).
-type validation_name() :: binary().
@ -60,12 +65,14 @@
-spec add_handler() -> ok.
add_handler() ->
ok = emqx_config_handler:add_handler([?CONF_ROOT], ?MODULE),
ok = emqx_config_handler:add_handler(?VALIDATIONS_CONF_PATH, ?MODULE),
ok.
-spec remove_handler() -> ok.
remove_handler() ->
ok = emqx_config_handler:remove_handler(?VALIDATIONS_CONF_PATH),
ok = emqx_config_handler:remove_handler([?CONF_ROOT]),
ok.
load() ->
@ -180,7 +187,12 @@ pre_config_update(?VALIDATIONS_CONF_PATH, {update, Validation}, OldValidations)
pre_config_update(?VALIDATIONS_CONF_PATH, {delete, Validation}, OldValidations) ->
delete(OldValidations, Validation);
pre_config_update(?VALIDATIONS_CONF_PATH, {reorder, Order}, OldValidations) ->
reorder(OldValidations, Order).
reorder(OldValidations, Order);
pre_config_update([?CONF_ROOT], {merge, NewConfig}, OldConfig) ->
#{resulting_config := Config} = prepare_config_merge(NewConfig, OldConfig),
{ok, Config};
pre_config_update([?CONF_ROOT], {replace, NewConfig}, _OldConfig) ->
{ok, NewConfig}.
post_config_update(?VALIDATIONS_CONF_PATH, {append, #{<<"name">> := Name}}, New, _Old, _AppEnvs) ->
{Pos, Validation} = fetch_with_index(New, Name),
@ -197,7 +209,81 @@ post_config_update(?VALIDATIONS_CONF_PATH, {delete, Name}, _New, Old, _AppEnvs)
ok;
post_config_update(?VALIDATIONS_CONF_PATH, {reorder, _Order}, New, _Old, _AppEnvs) ->
ok = emqx_schema_validation_registry:reindex_positions(New),
ok.
ok;
post_config_update([?CONF_ROOT], {merge, _}, ResultingConfig, Old, _AppEnvs) ->
#{validations := ResultingValidations} = ResultingConfig,
#{validations := OldValidations} = Old,
#{added := NewValidations0} =
emqx_utils:diff_lists(
ResultingValidations,
OldValidations,
fun(#{name := N}) -> N end
),
NewValidations =
lists:map(
fun(#{name := Name}) ->
{Pos, Validation} = fetch_with_index(ResultingValidations, Name),
ok = emqx_schema_validation_registry:insert(Pos, Validation),
#{name => Name, pos => Pos}
end,
NewValidations0
),
{ok, #{new_validations => NewValidations}};
post_config_update([?CONF_ROOT], {replace, Input}, ResultingConfig, Old, _AppEnvs) ->
#{
new_validations := NewValidations,
changed_validations := ChangedValidations0,
deleted_validations := DeletedValidations
} = prepare_config_replace(Input, Old),
#{validations := ResultingValidations} = ResultingConfig,
#{validations := OldValidations} = Old,
lists:foreach(
fun(Name) ->
{_Pos, Validation} = fetch_with_index(OldValidations, Name),
ok = emqx_schema_validation_registry:delete(Validation)
end,
DeletedValidations
),
lists:foreach(
fun(Name) ->
{Pos, Validation} = fetch_with_index(ResultingValidations, Name),
ok = emqx_schema_validation_registry:insert(Pos, Validation)
end,
NewValidations
),
ChangedValidations =
lists:map(
fun(Name) ->
{_Pos, OldValidation} = fetch_with_index(OldValidations, Name),
{Pos, NewValidation} = fetch_with_index(ResultingValidations, Name),
ok = emqx_schema_validation_registry:update(OldValidation, Pos, NewValidation),
#{name => Name, pos => Pos}
end,
ChangedValidations0
),
ok = emqx_schema_validation_registry:reindex_positions(ResultingValidations),
{ok, #{changed_validations => ChangedValidations}}.
%%------------------------------------------------------------------------------
%% `emqx_config_backup' API
%%------------------------------------------------------------------------------
import_config(#{?CONF_ROOT_BIN := RawConf0}) ->
Result = emqx_conf:update(
[?CONF_ROOT],
{merge, RawConf0},
#{override_to => cluster, rawconf_with_defaults => true}
),
case Result of
{error, Reason} ->
{error, #{root_key => ?CONF_ROOT, reason => Reason}};
{ok, _} ->
Keys0 = maps:keys(RawConf0),
ChangedPaths = Keys0 -- [<<"validations">>],
{ok, #{root_key => ?CONF_ROOT, changed => ChangedPaths}}
end;
import_config(_RawConf) ->
{ok, #{root_key => ?CONF_ROOT, changed => []}}.
%%------------------------------------------------------------------------------
%% Internal exports
@ -471,3 +557,55 @@ run_schema_validation_failed_hook(Message, Validation) ->
#{name := Name} = Validation,
ValidationContext = #{name => Name},
emqx_hooks:run('schema.validation_failed', [Message, ValidationContext]).
%% "Merging" in the context of the validation array means:
%% * Existing validations (identified by `name') are left untouched.
%% * No validations are removed.
%% * New validations are appended to the existing list.
%% * Existing validations are not reordered.
prepare_config_merge(NewConfig0, OldConfig) ->
{ImportedRawValidations, NewConfigNoValidations} =
case maps:take(<<"validations">>, NewConfig0) of
error ->
{[], NewConfig0};
{V, R} ->
{V, R}
end,
OldRawValidations = maps:get(<<"validations">>, OldConfig, []),
#{added := NewRawValidations} = emqx_utils:diff_lists(
ImportedRawValidations,
OldRawValidations,
fun(#{<<"name">> := N}) -> N end
),
Config0 = emqx_utils_maps:deep_merge(OldConfig, NewConfigNoValidations),
Config = maps:update_with(
<<"validations">>,
fun(OldVs) -> OldVs ++ NewRawValidations end,
NewRawValidations,
Config0
),
#{
new_validations => NewRawValidations,
resulting_config => Config
}.
prepare_config_replace(NewConfig, OldConfig) ->
ImportedRawValidations = maps:get(<<"validations">>, NewConfig, []),
OldValidations = maps:get(validations, OldConfig, []),
%% Since, at this point, we have an input raw config but a parsed old config, we
%% project both to the to have only their names, and consider common names as changed.
#{
added := NewValidations,
removed := DeletedValidations,
changed := ChangedValidations0,
identical := ChangedValidations1
} = emqx_utils:diff_lists(
lists:map(fun(#{<<"name">> := N}) -> N end, ImportedRawValidations),
lists:map(fun(#{name := N}) -> N end, OldValidations),
fun(N) -> N end
),
#{
new_validations => NewValidations,
changed_validations => ChangedValidations0 ++ ChangedValidations1,
deleted_validations => DeletedValidations
}.

View File

@ -229,6 +229,29 @@ monitor_metrics() ->
ct:pal("monitor metrics result:\n ~p", [Res]),
simplify_result(Res).
upload_backup(BackupFilePath) ->
Path = emqx_mgmt_api_test_util:api_path(["data", "files"]),
Res = emqx_mgmt_api_test_util:upload_request(
Path,
BackupFilePath,
"filename",
<<"application/octet-stream">>,
[],
emqx_mgmt_api_test_util:auth_header_()
),
simplify_result(Res).
export_backup() ->
Path = emqx_mgmt_api_test_util:api_path(["data", "export"]),
Res = request(post, Path, []),
simplify_result(Res).
import_backup(BackupName) ->
Path = emqx_mgmt_api_test_util:api_path(["data", "import"]),
Body = #{<<"filename">> => unicode:characters_to_binary(BackupName)},
Res = request(post, Path, Body),
simplify_result(Res).
connect(ClientId) ->
connect(ClientId, _IsPersistent = false).
@ -438,6 +461,12 @@ assert_monitor_metrics() ->
),
ok.
normalize_validations(RawValidations) ->
[
V#{<<"topics">> := [T]}
|| #{<<"topics">> := T} = V <- RawValidations
].
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
@ -1216,3 +1245,142 @@ t_schema_check_protobuf(_Config) ->
),
ok.
%% Tests that restoring a backup config works.
%% * Existing validations (identified by `name') are left untouched.
%% * No validations are removed.
%% * New validations are appended to the existing list.
%% * Existing validations are not reordered.
t_import_config_backup(_Config) ->
%% Setup backup file.
%% Will clash with existing validation; different order.
Name2 = <<"2">>,
Check2B = sql_check(<<"select 2 where false">>),
Validation2B = validation(Name2, [Check2B]),
{201, _} = insert(Validation2B),
%% Will clash with existing validation.
Name1 = <<"1">>,
Check1B = sql_check(<<"select 1 where false">>),
Validation1B = validation(Name1, [Check1B]),
{201, _} = insert(Validation1B),
%% New validation; should be appended
Name4 = <<"4">>,
Check4 = sql_check(<<"select 4 where true">>),
Validation4 = validation(Name4, [Check4]),
{201, _} = insert(Validation4),
{200, #{<<"filename">> := BackupName}} = export_backup(),
%% Clear this setup and pretend we have other data to begin with.
clear_all_validations(),
{200, []} = list(),
Check1A = sql_check(<<"select 1 where true">>),
Validation1A = validation(Name1, [Check1A]),
{201, _} = insert(Validation1A),
Check2A = sql_check(<<"select 2 where true">>),
Validation2A = validation(Name2, [Check2A]),
{201, _} = insert(Validation2A),
Name3 = <<"3">>,
Check3 = sql_check(<<"select 3 where true">>),
Validation3 = validation(Name3, [Check3]),
{201, _} = insert(Validation3),
{204, _} = import_backup(BackupName),
ExpectedValidations = normalize_validations([
Validation1A,
Validation2A,
Validation3,
Validation4
]),
?assertMatch({200, ExpectedValidations}, list()),
?assertIndexOrder([Name1, Name2, Name3, Name4], <<"t/a">>),
ok.
%% Tests that importing configurations from the CLI interface work.
t_load_config(_Config) ->
Name1 = <<"1">>,
Check1A = sql_check(<<"select 1 where true">>),
Validation1A = validation(Name1, [Check1A]),
{201, _} = insert(Validation1A),
Name2 = <<"2">>,
Check2A = sql_check(<<"select 2 where true">>),
Validation2A = validation(Name2, [Check2A]),
{201, _} = insert(Validation2A),
Name3 = <<"3">>,
Check3 = sql_check(<<"select 3 where true">>),
Validation3 = validation(Name3, [Check3]),
{201, _} = insert(Validation3),
%% Config to load
%% Will replace existing config
Check2B = sql_check(<<"select 2 where false">>),
Validation2B = validation(Name2, [Check2B]),
%% Will replace existing config
Check1B = sql_check(<<"select 1 where false">>),
Validation1B = validation(Name1, [Check1B]),
%% New validation; should be appended
Name4 = <<"4">>,
Check4 = sql_check(<<"select 4 where true">>),
Validation4 = validation(Name4, [Check4]),
ConfRootBin = <<"schema_validation">>,
ConfigToLoad1 = #{
ConfRootBin => #{
<<"validations">> => [Validation2B, Validation1B, Validation4]
}
},
ConfigToLoadBin1 = iolist_to_binary(hocon_pp:do(ConfigToLoad1, #{})),
?assertMatch(ok, emqx_conf_cli:load_config(ConfigToLoadBin1, #{mode => merge})),
ExpectedValidations1 = normalize_validations([
Validation1A,
Validation2A,
Validation3,
Validation4
]),
?assertMatch(
#{
ConfRootBin := #{
<<"validations">> := ExpectedValidations1
}
},
emqx_conf_cli:get_config(<<"schema_validation">>)
),
?assertIndexOrder([Name1, Name2, Name3, Name4], <<"t/a">>),
%% Replace
Check4B = sql_check(<<"select 4, true where true">>),
Validation4B = validation(Name4, [Check4B]),
Name5 = <<"5">>,
Check5 = sql_check(<<"select 5 where true">>),
Validation5 = validation(Name5, [Check5]),
ConfigToLoad2 = #{
ConfRootBin => #{<<"validations">> => [Validation4B, Validation3, Validation5]}
},
ConfigToLoadBin2 = iolist_to_binary(hocon_pp:do(ConfigToLoad2, #{})),
?assertMatch(ok, emqx_conf_cli:load_config(ConfigToLoadBin2, #{mode => replace})),
ExpectedValidations2 = normalize_validations([Validation4B, Validation3, Validation5]),
?assertMatch(
#{
ConfRootBin := #{
<<"validations">> := ExpectedValidations2
}
},
emqx_conf_cli:get_config(<<"schema_validation">>)
),
?assertIndexOrder([Name4, Name3, Name5], <<"t/a">>),
ok.

View File

@ -751,7 +751,6 @@ safe_filename(Filename) when is_list(Filename) ->
when
Func :: fun((T) -> any()),
T :: any().
diff_lists(New, Old, KeyFunc) when is_list(New) andalso is_list(Old) ->
Removed =
lists:foldl(