From 62030e89429b5a95d7e0ff37061db52ee83a120a Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 20 Mar 2024 15:26:00 -0300 Subject: [PATCH] feat(message validation): forbid repeated schema checks Fixes https://emqx.atlassian.net/browse/EMQX-12054 --- .../src/emqx_message_validation_schema.erl | 49 ++++++- ...emqx_message_validation_http_api_SUITE.erl | 15 +++ .../test/emqx_message_validation_tests.erl | 123 ++++++++++++++++++ 3 files changed, 181 insertions(+), 6 deletions(-) diff --git a/apps/emqx_message_validation/src/emqx_message_validation_schema.erl b/apps/emqx_message_validation/src/emqx_message_validation_schema.erl index 618b73481..9a27915d8 100644 --- a/apps/emqx_message_validation/src/emqx_message_validation_schema.erl +++ b/apps/emqx_message_validation/src/emqx_message_validation_schema.erl @@ -91,12 +91,7 @@ fields(validation) -> #{ required => true, desc => ?DESC("checks"), - validator => fun - ([]) -> - {error, "at least one check must be defined"}; - (_) -> - ok - end + validator => fun validate_unique_schema_checks/1 } )} ]; @@ -232,3 +227,45 @@ do_validate_unique_names([#{<<"name">> := Name} | _Rest], Acc) when is_map_key(N {error, <<"duplicated name: ", Name/binary>>}; do_validate_unique_names([#{<<"name">> := Name} | Rest], Acc) -> do_validate_unique_names(Rest, Acc#{Name => true}). + +validate_unique_schema_checks([]) -> + {error, "at least one check must be defined"}; +validate_unique_schema_checks(Checks) -> + Seen = sets:new([{version, 2}]), + Duplicated = sets:new([{version, 2}]), + do_validate_unique_schema_checks(Checks, Seen, Duplicated). + +do_validate_unique_schema_checks(_Checks = [], _Seen, Duplicated) -> + case sets:to_list(Duplicated) of + [] -> + ok; + DuplicatedChecks0 -> + DuplicatedChecks = + lists:map( + fun({Type, SerdeName}) -> + [atom_to_binary(Type), ":", SerdeName] + end, + DuplicatedChecks0 + ), + Msg = iolist_to_binary([ + <<"duplicated schema checks: ">>, + lists:join(", ", DuplicatedChecks) + ]), + {error, Msg} + end; +do_validate_unique_schema_checks( + [#{<<"type">> := Type, <<"schema">> := SerdeName} | Rest], + Seen0, + Duplicated0 +) -> + Check = {Type, SerdeName}, + case sets:is_element(Check, Seen0) of + true -> + Duplicated = sets:add_element(Check, Duplicated0), + do_validate_unique_schema_checks(Rest, Seen0, Duplicated); + false -> + Seen = sets:add_element(Check, Seen0), + do_validate_unique_schema_checks(Rest, Seen, Duplicated0) + end; +do_validate_unique_schema_checks([_Check | Rest], Seen, Duplicated) -> + do_validate_unique_schema_checks(Rest, Seen, Duplicated). diff --git a/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl b/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl index 8f052a147..9e22900fb 100644 --- a/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl +++ b/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl @@ -661,6 +661,21 @@ t_enable_disable_via_api_endpoint(_Config) -> ok. +t_duplicated_schema_checks(_Config) -> + Name1 = <<"foo">>, + SerdeName = <<"myserde">>, + Check = schema_check(json, SerdeName), + + Validation1 = validation(Name1, [Check, sql_check(), Check]), + ?assertMatch({400, _}, insert(Validation1)), + + Validation2 = validation(Name1, [Check, sql_check()]), + ?assertMatch({201, _}, insert(Validation2)), + + ?assertMatch({400, _}, update(Validation1)), + + ok. + %% Check the `all_pass' strategy t_all_pass(_Config) -> Name1 = <<"foo">>, diff --git a/apps/emqx_message_validation/test/emqx_message_validation_tests.erl b/apps/emqx_message_validation/test/emqx_message_validation_tests.erl index 5c0d7d1d8..c344f6202 100644 --- a/apps/emqx_message_validation/test/emqx_message_validation_tests.erl +++ b/apps/emqx_message_validation/test/emqx_message_validation_tests.erl @@ -52,6 +52,18 @@ sql_check(SQL) -> <<"sql">> => SQL }. +schema_check(Type, SerdeName) -> + schema_check(Type, SerdeName, _Overrides = #{}). + +schema_check(Type, SerdeName, Overrides) -> + emqx_utils_maps:deep_merge( + #{ + <<"type">> => emqx_utils_conv:bin(Type), + <<"schema">> => SerdeName + }, + Overrides + ). + eval_sql(Message, SQL) -> {ok, Check} = emqx_message_validation:parse_sql_check(SQL), Validation = #{log_failure => #{level => warning}, name => <<"validation">>}, @@ -217,3 +229,114 @@ check_test_() -> {"never passes 2", ?_assertNot(eval_sql(message(), <<"select * where 1 = 2">>))}, {"never passes 3", ?_assertNot(eval_sql(message(), <<"select * where true and false">>))} ]. + +duplicated_check_test_() -> + [ + {"duplicated sql checks are not checked", + ?_assertMatch( + [#{<<"checks">> := [_, _]}], + parse_and_check([ + validation(<<"foo">>, [sql_check(), sql_check()]) + ]) + )}, + {"different serdes with same name", + ?_assertMatch( + [#{<<"checks">> := [_, _, _]}], + parse_and_check([ + validation(<<"foo">>, [ + schema_check(json, <<"a">>), + schema_check(avro, <<"a">>), + schema_check( + protobuf, + <<"a">>, + #{<<"message_name">> => <<"a">>} + ) + ]) + ]) + )}, + {"duplicated serdes 1", + ?_assertThrow( + {_Schema, [ + #{ + reason := <<"duplicated schema checks: json:a">>, + kind := validation_error, + path := "message_validation.validations.1.checks" + } + ]}, + parse_and_check([ + validation(<<"foo">>, [ + schema_check(json, <<"a">>), + schema_check(json, <<"a">>) + ]) + ]) + )}, + {"duplicated serdes 2", + ?_assertThrow( + {_Schema, [ + #{ + reason := <<"duplicated schema checks: json:a">>, + kind := validation_error, + path := "message_validation.validations.1.checks" + } + ]}, + parse_and_check([ + validation(<<"foo">>, [ + schema_check(json, <<"a">>), + sql_check(), + schema_check(json, <<"a">>) + ]) + ]) + )}, + {"duplicated serdes 3", + ?_assertThrow( + {_Schema, [ + #{ + reason := <<"duplicated schema checks: json:a">>, + kind := validation_error, + path := "message_validation.validations.1.checks" + } + ]}, + parse_and_check([ + validation(<<"foo">>, [ + schema_check(json, <<"a">>), + schema_check(json, <<"a">>), + sql_check() + ]) + ]) + )}, + {"duplicated serdes 4", + ?_assertThrow( + {_Schema, [ + #{ + reason := <<"duplicated schema checks: json:a">>, + kind := validation_error, + path := "message_validation.validations.1.checks" + } + ]}, + parse_and_check([ + validation(<<"foo">>, [ + schema_check(json, <<"a">>), + schema_check(json, <<"a">>), + schema_check(json, <<"a">>) + ]) + ]) + )}, + {"duplicated serdes 4", + ?_assertThrow( + {_Schema, [ + #{ + reason := <<"duplicated schema checks: ", _/binary>>, + kind := validation_error, + path := "message_validation.validations.1.checks" + } + ]}, + parse_and_check([ + validation(<<"foo">>, [ + schema_check(json, <<"a">>), + schema_check(json, <<"a">>), + schema_check(avro, <<"b">>), + schema_check(avro, <<"b">>) + ]) + ]) + )} + ].