feat(schema validation): implement backup restore

Fixes https://emqx.atlassian.net/browse/EMQX-12346
This commit is contained in:
Thales Macedo Garitezi 2024-05-14 15:01:39 -03:00
parent 1730a41337
commit fd949240c0
2 changed files with 143 additions and 0 deletions

View File

@ -35,6 +35,10 @@
%% `emqx_config_handler' API %% `emqx_config_handler' API
-export([pre_config_update/3, post_config_update/5]). -export([pre_config_update/3, post_config_update/5]).
%% `emqx_config_backup' API
-behaviour(emqx_config_backup).
-export([import_config/1]).
%% Internal exports %% Internal exports
-export([parse_sql_check/1]). -export([parse_sql_check/1]).
@ -49,6 +53,7 @@
-define(TRACE_TAG, "SCHEMA_VALIDATION"). -define(TRACE_TAG, "SCHEMA_VALIDATION").
-define(CONF_ROOT, schema_validation). -define(CONF_ROOT, schema_validation).
-define(CONF_ROOT_BIN, <<"schema_validation">>).
-define(VALIDATIONS_CONF_PATH, [?CONF_ROOT, validations]). -define(VALIDATIONS_CONF_PATH, [?CONF_ROOT, validations]).
-type validation_name() :: binary(). -type validation_name() :: binary().
@ -199,6 +204,60 @@ post_config_update(?VALIDATIONS_CONF_PATH, {reorder, _Order}, New, _Old, _AppEnv
ok = emqx_schema_validation_registry:reindex_positions(New), ok = emqx_schema_validation_registry:reindex_positions(New),
ok. ok.
%%------------------------------------------------------------------------------
%% `emqx_config_backup' API
%%------------------------------------------------------------------------------
import_config(#{?CONF_ROOT_BIN := RawConf0}) ->
OldRawValidations = emqx_config:get_raw([?CONF_ROOT_BIN, <<"validations">>], []),
{ImportedRawValidations, RawConf1} =
case maps:take(<<"validations">>, RawConf0) of
error ->
{[], RawConf0};
{V, R} ->
{V, R}
end,
%% If there's a matching validation, we don't overwrite it. We don't remove any
%% validations, either.
#{added := NewRawValidations} = emqx_utils:diff_lists(
ImportedRawValidations,
OldRawValidations,
fun(#{<<"name">> := N}) -> N end
),
OtherConfs = maps:to_list(RawConf1),
Result = emqx_utils:foldl_while(
fun
({validation, RawValidation}, Acc) ->
case insert(RawValidation) of
{ok, _} ->
#{<<"name">> := N} = RawValidation,
ChangedPath = [?CONF_ROOT, validations, '?', N],
{cont, [ChangedPath | Acc]};
{error, _} = Error ->
{halt, Error}
end;
({Key, RawConf}, Acc) ->
case emqx_conf:update([?CONF_ROOT_BIN, Key], RawConf, #{override_to => cluster}) of
{ok, _} ->
ChangedPath = [?CONF_ROOT, Key],
{conf, [ChangedPath | Acc]};
{error, _} = Error ->
{halt, Error}
end
end,
[],
OtherConfs ++
[{validation, NewValidation} || NewValidation <- NewRawValidations]
),
case Result of
{error, Reason} ->
{error, #{root_key => ?CONF_ROOT, reason => Reason}};
ChangedPaths ->
{ok, #{root_key => ?CONF_ROOT, changed => ChangedPaths}}
end;
import_config(_RawConf) ->
{ok, #{root_key => ?CONF_ROOT, changed => []}}.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Internal exports %% Internal exports
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------

View File

@ -229,6 +229,29 @@ monitor_metrics() ->
ct:pal("monitor metrics result:\n ~p", [Res]), ct:pal("monitor metrics result:\n ~p", [Res]),
simplify_result(Res). simplify_result(Res).
upload_backup(BackupFilePath) ->
Path = emqx_mgmt_api_test_util:api_path(["data", "files"]),
Res = emqx_mgmt_api_test_util:upload_request(
Path,
BackupFilePath,
"filename",
<<"application/octet-stream">>,
[],
emqx_mgmt_api_test_util:auth_header_()
),
simplify_result(Res).
export_backup() ->
Path = emqx_mgmt_api_test_util:api_path(["data", "export"]),
Res = request(post, Path, []),
simplify_result(Res).
import_backup(BackupName) ->
Path = emqx_mgmt_api_test_util:api_path(["data", "import"]),
Body = #{<<"filename">> => unicode:characters_to_binary(BackupName)},
Res = request(post, Path, Body),
simplify_result(Res).
connect(ClientId) -> connect(ClientId) ->
connect(ClientId, _IsPersistent = false). connect(ClientId, _IsPersistent = false).
@ -1216,3 +1239,64 @@ t_schema_check_protobuf(_Config) ->
), ),
ok. ok.
%% Tests that restoring a backup config works.
%% * Existing validations (identified by `name') are left untouched.
%% * No validations are removed.
%% * New validations are appended to the existing list.
%% * Existing validations are not reordered.
t_import_config_backup(_Config) ->
%% Setup backup file.
%% Will clash with existing validation; different order.
Name2 = <<"2">>,
Check2B = sql_check(<<"select 2 where false">>),
Validation2B = validation(Name2, [Check2B]),
{201, _} = insert(Validation2B),
%% Will clash with existing validation.
Name1 = <<"1">>,
Check1B = sql_check(<<"select 1 where false">>),
Validation1B = validation(Name1, [Check1B]),
{201, _} = insert(Validation1B),
%% New validation; should be appended
Name4 = <<"4">>,
Check4 = sql_check(<<"select 4 where true">>),
Validation4 = validation(Name4, [Check4]),
{201, _} = insert(Validation4),
{200, #{<<"filename">> := BackupName}} = export_backup(),
%% Clear this setup and pretend we have other data to begin with.
clear_all_validations(),
{200, []} = list(),
Check1A = sql_check(<<"select 1 where true">>),
Validation1A = validation(Name1, [Check1A]),
{201, _} = insert(Validation1A),
Check2A = sql_check(<<"select 2 where true">>),
Validation2A = validation(Name2, [Check2A]),
{201, _} = insert(Validation2A),
Name3 = <<"3">>,
Check3 = sql_check(<<"select 3 where true">>),
Validation3 = validation(Name3, [Check3]),
{201, _} = insert(Validation3),
{204, _} = import_backup(BackupName),
ExpectedValidations = [
V#{<<"topics">> := [T]}
|| #{<<"topics">> := T} = V <- [
Validation1A,
Validation2A,
Validation3,
Validation4
]
],
?assertMatch({200, ExpectedValidations}, list()),
?assertIndexOrder([Name1, Name2, Name3, Name4], <<"t/a">>),
ok.