diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index e0ad4cb4f..08a86d49a 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -36,6 +36,7 @@ -define(CONF, conf). -define(AUDIT_MOD, audit). -define(UPDATE_READONLY_KEYS_PROHIBITED, <<"Cannot update read-only key '~s'.">>). +-define(SCHEMA_VALIDATION_CONF_ROOT_BIN, <<"schema_validation">>). -dialyzer({no_match, [load/0]}). @@ -330,6 +331,10 @@ update_config_cluster( #{mode := merge} = Opts ) -> check_res(Key, emqx_authn:merge_config(Conf), Conf, Opts); +update_config_cluster(?SCHEMA_VALIDATION_CONF_ROOT_BIN = Key, NewConf, #{mode := merge} = Opts) -> + check_res(Key, emqx_conf:update([Key], {merge, NewConf}, ?OPTIONS), NewConf, Opts); +update_config_cluster(?SCHEMA_VALIDATION_CONF_ROOT_BIN = Key, NewConf, #{mode := replace} = Opts) -> + check_res(Key, emqx_conf:update([Key], {replace, NewConf}, ?OPTIONS), NewConf, Opts); update_config_cluster(Key, NewConf, #{mode := merge} = Opts) -> Merged = merge_conf(Key, NewConf), check_res(Key, emqx_conf:update([Key], Merged, ?OPTIONS), NewConf, Opts); diff --git a/apps/emqx_schema_validation/src/emqx_schema_validation.erl b/apps/emqx_schema_validation/src/emqx_schema_validation.erl index 12aa1e733..3ec0e019d 100644 --- a/apps/emqx_schema_validation/src/emqx_schema_validation.erl +++ b/apps/emqx_schema_validation/src/emqx_schema_validation.erl @@ -35,6 +35,10 @@ %% `emqx_config_handler' API -export([pre_config_update/3, post_config_update/5]). +%% `emqx_config_backup' API +-behaviour(emqx_config_backup). +-export([import_config/1]). + %% Internal exports -export([parse_sql_check/1]). @@ -49,6 +53,7 @@ -define(TRACE_TAG, "SCHEMA_VALIDATION"). -define(CONF_ROOT, schema_validation). +-define(CONF_ROOT_BIN, <<"schema_validation">>). -define(VALIDATIONS_CONF_PATH, [?CONF_ROOT, validations]). -type validation_name() :: binary(). @@ -60,12 +65,14 @@ -spec add_handler() -> ok. add_handler() -> + ok = emqx_config_handler:add_handler([?CONF_ROOT], ?MODULE), ok = emqx_config_handler:add_handler(?VALIDATIONS_CONF_PATH, ?MODULE), ok. -spec remove_handler() -> ok. remove_handler() -> ok = emqx_config_handler:remove_handler(?VALIDATIONS_CONF_PATH), + ok = emqx_config_handler:remove_handler([?CONF_ROOT]), ok. load() -> @@ -180,7 +187,12 @@ pre_config_update(?VALIDATIONS_CONF_PATH, {update, Validation}, OldValidations) pre_config_update(?VALIDATIONS_CONF_PATH, {delete, Validation}, OldValidations) -> delete(OldValidations, Validation); pre_config_update(?VALIDATIONS_CONF_PATH, {reorder, Order}, OldValidations) -> - reorder(OldValidations, Order). + reorder(OldValidations, Order); +pre_config_update([?CONF_ROOT], {merge, NewConfig}, OldConfig) -> + #{resulting_config := Config} = prepare_config_merge(NewConfig, OldConfig), + {ok, Config}; +pre_config_update([?CONF_ROOT], {replace, NewConfig}, _OldConfig) -> + {ok, NewConfig}. post_config_update(?VALIDATIONS_CONF_PATH, {append, #{<<"name">> := Name}}, New, _Old, _AppEnvs) -> {Pos, Validation} = fetch_with_index(New, Name), @@ -197,7 +209,81 @@ post_config_update(?VALIDATIONS_CONF_PATH, {delete, Name}, _New, Old, _AppEnvs) ok; post_config_update(?VALIDATIONS_CONF_PATH, {reorder, _Order}, New, _Old, _AppEnvs) -> ok = emqx_schema_validation_registry:reindex_positions(New), - ok. + ok; +post_config_update([?CONF_ROOT], {merge, _}, ResultingConfig, Old, _AppEnvs) -> + #{validations := ResultingValidations} = ResultingConfig, + #{validations := OldValidations} = Old, + #{added := NewValidations0} = + emqx_utils:diff_lists( + ResultingValidations, + OldValidations, + fun(#{name := N}) -> N end + ), + NewValidations = + lists:map( + fun(#{name := Name}) -> + {Pos, Validation} = fetch_with_index(ResultingValidations, Name), + ok = emqx_schema_validation_registry:insert(Pos, Validation), + #{name => Name, pos => Pos} + end, + NewValidations0 + ), + {ok, #{new_validations => NewValidations}}; +post_config_update([?CONF_ROOT], {replace, Input}, ResultingConfig, Old, _AppEnvs) -> + #{ + new_validations := NewValidations, + changed_validations := ChangedValidations0, + deleted_validations := DeletedValidations + } = prepare_config_replace(Input, Old), + #{validations := ResultingValidations} = ResultingConfig, + #{validations := OldValidations} = Old, + lists:foreach( + fun(Name) -> + {_Pos, Validation} = fetch_with_index(OldValidations, Name), + ok = emqx_schema_validation_registry:delete(Validation) + end, + DeletedValidations + ), + lists:foreach( + fun(Name) -> + {Pos, Validation} = fetch_with_index(ResultingValidations, Name), + ok = emqx_schema_validation_registry:insert(Pos, Validation) + end, + NewValidations + ), + ChangedValidations = + lists:map( + fun(Name) -> + {_Pos, OldValidation} = fetch_with_index(OldValidations, Name), + {Pos, NewValidation} = fetch_with_index(ResultingValidations, Name), + ok = emqx_schema_validation_registry:update(OldValidation, Pos, NewValidation), + #{name => Name, pos => Pos} + end, + ChangedValidations0 + ), + ok = emqx_schema_validation_registry:reindex_positions(ResultingValidations), + {ok, #{changed_validations => ChangedValidations}}. + +%%------------------------------------------------------------------------------ +%% `emqx_config_backup' API +%%------------------------------------------------------------------------------ + +import_config(#{?CONF_ROOT_BIN := RawConf0}) -> + Result = emqx_conf:update( + [?CONF_ROOT], + {merge, RawConf0}, + #{override_to => cluster, rawconf_with_defaults => true} + ), + case Result of + {error, Reason} -> + {error, #{root_key => ?CONF_ROOT, reason => Reason}}; + {ok, _} -> + Keys0 = maps:keys(RawConf0), + ChangedPaths = Keys0 -- [<<"validations">>], + {ok, #{root_key => ?CONF_ROOT, changed => ChangedPaths}} + end; +import_config(_RawConf) -> + {ok, #{root_key => ?CONF_ROOT, changed => []}}. %%------------------------------------------------------------------------------ %% Internal exports @@ -471,3 +557,55 @@ run_schema_validation_failed_hook(Message, Validation) -> #{name := Name} = Validation, ValidationContext = #{name => Name}, emqx_hooks:run('schema.validation_failed', [Message, ValidationContext]). + +%% "Merging" in the context of the validation array means: +%% * Existing validations (identified by `name') are left untouched. +%% * No validations are removed. +%% * New validations are appended to the existing list. +%% * Existing validations are not reordered. +prepare_config_merge(NewConfig0, OldConfig) -> + {ImportedRawValidations, NewConfigNoValidations} = + case maps:take(<<"validations">>, NewConfig0) of + error -> + {[], NewConfig0}; + {V, R} -> + {V, R} + end, + OldRawValidations = maps:get(<<"validations">>, OldConfig, []), + #{added := NewRawValidations} = emqx_utils:diff_lists( + ImportedRawValidations, + OldRawValidations, + fun(#{<<"name">> := N}) -> N end + ), + Config0 = emqx_utils_maps:deep_merge(OldConfig, NewConfigNoValidations), + Config = maps:update_with( + <<"validations">>, + fun(OldVs) -> OldVs ++ NewRawValidations end, + NewRawValidations, + Config0 + ), + #{ + new_validations => NewRawValidations, + resulting_config => Config + }. + +prepare_config_replace(NewConfig, OldConfig) -> + ImportedRawValidations = maps:get(<<"validations">>, NewConfig, []), + OldValidations = maps:get(validations, OldConfig, []), + %% Since, at this point, we have an input raw config but a parsed old config, we + %% project both to the to have only their names, and consider common names as changed. + #{ + added := NewValidations, + removed := DeletedValidations, + changed := ChangedValidations0, + identical := ChangedValidations1 + } = emqx_utils:diff_lists( + lists:map(fun(#{<<"name">> := N}) -> N end, ImportedRawValidations), + lists:map(fun(#{name := N}) -> N end, OldValidations), + fun(N) -> N end + ), + #{ + new_validations => NewValidations, + changed_validations => ChangedValidations0 ++ ChangedValidations1, + deleted_validations => DeletedValidations + }. diff --git a/apps/emqx_schema_validation/test/emqx_schema_validation_http_api_SUITE.erl b/apps/emqx_schema_validation/test/emqx_schema_validation_http_api_SUITE.erl index 0a5cd49cd..41731fa1b 100644 --- a/apps/emqx_schema_validation/test/emqx_schema_validation_http_api_SUITE.erl +++ b/apps/emqx_schema_validation/test/emqx_schema_validation_http_api_SUITE.erl @@ -229,6 +229,29 @@ monitor_metrics() -> ct:pal("monitor metrics result:\n ~p", [Res]), simplify_result(Res). +upload_backup(BackupFilePath) -> + Path = emqx_mgmt_api_test_util:api_path(["data", "files"]), + Res = emqx_mgmt_api_test_util:upload_request( + Path, + BackupFilePath, + "filename", + <<"application/octet-stream">>, + [], + emqx_mgmt_api_test_util:auth_header_() + ), + simplify_result(Res). + +export_backup() -> + Path = emqx_mgmt_api_test_util:api_path(["data", "export"]), + Res = request(post, Path, []), + simplify_result(Res). + +import_backup(BackupName) -> + Path = emqx_mgmt_api_test_util:api_path(["data", "import"]), + Body = #{<<"filename">> => unicode:characters_to_binary(BackupName)}, + Res = request(post, Path, Body), + simplify_result(Res). + connect(ClientId) -> connect(ClientId, _IsPersistent = false). @@ -438,6 +461,12 @@ assert_monitor_metrics() -> ), ok. +normalize_validations(RawValidations) -> + [ + V#{<<"topics">> := [T]} + || #{<<"topics">> := T} = V <- RawValidations + ]. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -1216,3 +1245,142 @@ t_schema_check_protobuf(_Config) -> ), ok. + +%% Tests that restoring a backup config works. +%% * Existing validations (identified by `name') are left untouched. +%% * No validations are removed. +%% * New validations are appended to the existing list. +%% * Existing validations are not reordered. +t_import_config_backup(_Config) -> + %% Setup backup file. + + %% Will clash with existing validation; different order. + Name2 = <<"2">>, + Check2B = sql_check(<<"select 2 where false">>), + Validation2B = validation(Name2, [Check2B]), + {201, _} = insert(Validation2B), + + %% Will clash with existing validation. + Name1 = <<"1">>, + Check1B = sql_check(<<"select 1 where false">>), + Validation1B = validation(Name1, [Check1B]), + {201, _} = insert(Validation1B), + + %% New validation; should be appended + Name4 = <<"4">>, + Check4 = sql_check(<<"select 4 where true">>), + Validation4 = validation(Name4, [Check4]), + {201, _} = insert(Validation4), + + {200, #{<<"filename">> := BackupName}} = export_backup(), + + %% Clear this setup and pretend we have other data to begin with. + clear_all_validations(), + {200, []} = list(), + + Check1A = sql_check(<<"select 1 where true">>), + Validation1A = validation(Name1, [Check1A]), + {201, _} = insert(Validation1A), + + Check2A = sql_check(<<"select 2 where true">>), + Validation2A = validation(Name2, [Check2A]), + {201, _} = insert(Validation2A), + + Name3 = <<"3">>, + Check3 = sql_check(<<"select 3 where true">>), + Validation3 = validation(Name3, [Check3]), + {201, _} = insert(Validation3), + + {204, _} = import_backup(BackupName), + + ExpectedValidations = normalize_validations([ + Validation1A, + Validation2A, + Validation3, + Validation4 + ]), + ?assertMatch({200, ExpectedValidations}, list()), + ?assertIndexOrder([Name1, Name2, Name3, Name4], <<"t/a">>), + + ok. + +%% Tests that importing configurations from the CLI interface work. +t_load_config(_Config) -> + Name1 = <<"1">>, + Check1A = sql_check(<<"select 1 where true">>), + Validation1A = validation(Name1, [Check1A]), + {201, _} = insert(Validation1A), + + Name2 = <<"2">>, + Check2A = sql_check(<<"select 2 where true">>), + Validation2A = validation(Name2, [Check2A]), + {201, _} = insert(Validation2A), + + Name3 = <<"3">>, + Check3 = sql_check(<<"select 3 where true">>), + Validation3 = validation(Name3, [Check3]), + {201, _} = insert(Validation3), + + %% Config to load + %% Will replace existing config + Check2B = sql_check(<<"select 2 where false">>), + Validation2B = validation(Name2, [Check2B]), + + %% Will replace existing config + Check1B = sql_check(<<"select 1 where false">>), + Validation1B = validation(Name1, [Check1B]), + + %% New validation; should be appended + Name4 = <<"4">>, + Check4 = sql_check(<<"select 4 where true">>), + Validation4 = validation(Name4, [Check4]), + + ConfRootBin = <<"schema_validation">>, + ConfigToLoad1 = #{ + ConfRootBin => #{ + <<"validations">> => [Validation2B, Validation1B, Validation4] + } + }, + ConfigToLoadBin1 = iolist_to_binary(hocon_pp:do(ConfigToLoad1, #{})), + ?assertMatch(ok, emqx_conf_cli:load_config(ConfigToLoadBin1, #{mode => merge})), + ExpectedValidations1 = normalize_validations([ + Validation1A, + Validation2A, + Validation3, + Validation4 + ]), + ?assertMatch( + #{ + ConfRootBin := #{ + <<"validations">> := ExpectedValidations1 + } + }, + emqx_conf_cli:get_config(<<"schema_validation">>) + ), + ?assertIndexOrder([Name1, Name2, Name3, Name4], <<"t/a">>), + + %% Replace + Check4B = sql_check(<<"select 4, true where true">>), + Validation4B = validation(Name4, [Check4B]), + + Name5 = <<"5">>, + Check5 = sql_check(<<"select 5 where true">>), + Validation5 = validation(Name5, [Check5]), + + ConfigToLoad2 = #{ + ConfRootBin => #{<<"validations">> => [Validation4B, Validation3, Validation5]} + }, + ConfigToLoadBin2 = iolist_to_binary(hocon_pp:do(ConfigToLoad2, #{})), + ?assertMatch(ok, emqx_conf_cli:load_config(ConfigToLoadBin2, #{mode => replace})), + ExpectedValidations2 = normalize_validations([Validation4B, Validation3, Validation5]), + ?assertMatch( + #{ + ConfRootBin := #{ + <<"validations">> := ExpectedValidations2 + } + }, + emqx_conf_cli:get_config(<<"schema_validation">>) + ), + ?assertIndexOrder([Name4, Name3, Name5], <<"t/a">>), + + ok. diff --git a/apps/emqx_utils/src/emqx_utils.erl b/apps/emqx_utils/src/emqx_utils.erl index 007c2b54b..536b427b3 100644 --- a/apps/emqx_utils/src/emqx_utils.erl +++ b/apps/emqx_utils/src/emqx_utils.erl @@ -751,7 +751,6 @@ safe_filename(Filename) when is_list(Filename) -> when Func :: fun((T) -> any()), T :: any(). - diff_lists(New, Old, KeyFunc) when is_list(New) andalso is_list(Old) -> Removed = lists:foldl(