feat(message validation): forbid repeated schema checks
Fixes https://emqx.atlassian.net/browse/EMQX-12054
This commit is contained in:
parent
b8cd1c9020
commit
62030e8942
|
@ -91,12 +91,7 @@ fields(validation) ->
|
||||||
#{
|
#{
|
||||||
required => true,
|
required => true,
|
||||||
desc => ?DESC("checks"),
|
desc => ?DESC("checks"),
|
||||||
validator => fun
|
validator => fun validate_unique_schema_checks/1
|
||||||
([]) ->
|
|
||||||
{error, "at least one check must be defined"};
|
|
||||||
(_) ->
|
|
||||||
ok
|
|
||||||
end
|
|
||||||
}
|
}
|
||||||
)}
|
)}
|
||||||
];
|
];
|
||||||
|
@ -232,3 +227,45 @@ do_validate_unique_names([#{<<"name">> := Name} | _Rest], Acc) when is_map_key(N
|
||||||
{error, <<"duplicated name: ", Name/binary>>};
|
{error, <<"duplicated name: ", Name/binary>>};
|
||||||
do_validate_unique_names([#{<<"name">> := Name} | Rest], Acc) ->
|
do_validate_unique_names([#{<<"name">> := Name} | Rest], Acc) ->
|
||||||
do_validate_unique_names(Rest, Acc#{Name => true}).
|
do_validate_unique_names(Rest, Acc#{Name => true}).
|
||||||
|
|
||||||
|
validate_unique_schema_checks([]) ->
|
||||||
|
{error, "at least one check must be defined"};
|
||||||
|
validate_unique_schema_checks(Checks) ->
|
||||||
|
Seen = sets:new([{version, 2}]),
|
||||||
|
Duplicated = sets:new([{version, 2}]),
|
||||||
|
do_validate_unique_schema_checks(Checks, Seen, Duplicated).
|
||||||
|
|
||||||
|
do_validate_unique_schema_checks(_Checks = [], _Seen, Duplicated) ->
|
||||||
|
case sets:to_list(Duplicated) of
|
||||||
|
[] ->
|
||||||
|
ok;
|
||||||
|
DuplicatedChecks0 ->
|
||||||
|
DuplicatedChecks =
|
||||||
|
lists:map(
|
||||||
|
fun({Type, SerdeName}) ->
|
||||||
|
[atom_to_binary(Type), ":", SerdeName]
|
||||||
|
end,
|
||||||
|
DuplicatedChecks0
|
||||||
|
),
|
||||||
|
Msg = iolist_to_binary([
|
||||||
|
<<"duplicated schema checks: ">>,
|
||||||
|
lists:join(", ", DuplicatedChecks)
|
||||||
|
]),
|
||||||
|
{error, Msg}
|
||||||
|
end;
|
||||||
|
do_validate_unique_schema_checks(
|
||||||
|
[#{<<"type">> := Type, <<"schema">> := SerdeName} | Rest],
|
||||||
|
Seen0,
|
||||||
|
Duplicated0
|
||||||
|
) ->
|
||||||
|
Check = {Type, SerdeName},
|
||||||
|
case sets:is_element(Check, Seen0) of
|
||||||
|
true ->
|
||||||
|
Duplicated = sets:add_element(Check, Duplicated0),
|
||||||
|
do_validate_unique_schema_checks(Rest, Seen0, Duplicated);
|
||||||
|
false ->
|
||||||
|
Seen = sets:add_element(Check, Seen0),
|
||||||
|
do_validate_unique_schema_checks(Rest, Seen, Duplicated0)
|
||||||
|
end;
|
||||||
|
do_validate_unique_schema_checks([_Check | Rest], Seen, Duplicated) ->
|
||||||
|
do_validate_unique_schema_checks(Rest, Seen, Duplicated).
|
||||||
|
|
|
@ -661,6 +661,21 @@ t_enable_disable_via_api_endpoint(_Config) ->
|
||||||
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_duplicated_schema_checks(_Config) ->
|
||||||
|
Name1 = <<"foo">>,
|
||||||
|
SerdeName = <<"myserde">>,
|
||||||
|
Check = schema_check(json, SerdeName),
|
||||||
|
|
||||||
|
Validation1 = validation(Name1, [Check, sql_check(), Check]),
|
||||||
|
?assertMatch({400, _}, insert(Validation1)),
|
||||||
|
|
||||||
|
Validation2 = validation(Name1, [Check, sql_check()]),
|
||||||
|
?assertMatch({201, _}, insert(Validation2)),
|
||||||
|
|
||||||
|
?assertMatch({400, _}, update(Validation1)),
|
||||||
|
|
||||||
|
ok.
|
||||||
|
|
||||||
%% Check the `all_pass' strategy
|
%% Check the `all_pass' strategy
|
||||||
t_all_pass(_Config) ->
|
t_all_pass(_Config) ->
|
||||||
Name1 = <<"foo">>,
|
Name1 = <<"foo">>,
|
||||||
|
|
|
@ -52,6 +52,18 @@ sql_check(SQL) ->
|
||||||
<<"sql">> => SQL
|
<<"sql">> => SQL
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
schema_check(Type, SerdeName) ->
|
||||||
|
schema_check(Type, SerdeName, _Overrides = #{}).
|
||||||
|
|
||||||
|
schema_check(Type, SerdeName, Overrides) ->
|
||||||
|
emqx_utils_maps:deep_merge(
|
||||||
|
#{
|
||||||
|
<<"type">> => emqx_utils_conv:bin(Type),
|
||||||
|
<<"schema">> => SerdeName
|
||||||
|
},
|
||||||
|
Overrides
|
||||||
|
).
|
||||||
|
|
||||||
eval_sql(Message, SQL) ->
|
eval_sql(Message, SQL) ->
|
||||||
{ok, Check} = emqx_message_validation:parse_sql_check(SQL),
|
{ok, Check} = emqx_message_validation:parse_sql_check(SQL),
|
||||||
Validation = #{log_failure => #{level => warning}, name => <<"validation">>},
|
Validation = #{log_failure => #{level => warning}, name => <<"validation">>},
|
||||||
|
@ -217,3 +229,114 @@ check_test_() ->
|
||||||
{"never passes 2", ?_assertNot(eval_sql(message(), <<"select * where 1 = 2">>))},
|
{"never passes 2", ?_assertNot(eval_sql(message(), <<"select * where 1 = 2">>))},
|
||||||
{"never passes 3", ?_assertNot(eval_sql(message(), <<"select * where true and false">>))}
|
{"never passes 3", ?_assertNot(eval_sql(message(), <<"select * where true and false">>))}
|
||||||
].
|
].
|
||||||
|
|
||||||
|
duplicated_check_test_() ->
|
||||||
|
[
|
||||||
|
{"duplicated sql checks are not checked",
|
||||||
|
?_assertMatch(
|
||||||
|
[#{<<"checks">> := [_, _]}],
|
||||||
|
parse_and_check([
|
||||||
|
validation(<<"foo">>, [sql_check(), sql_check()])
|
||||||
|
])
|
||||||
|
)},
|
||||||
|
{"different serdes with same name",
|
||||||
|
?_assertMatch(
|
||||||
|
[#{<<"checks">> := [_, _, _]}],
|
||||||
|
parse_and_check([
|
||||||
|
validation(<<"foo">>, [
|
||||||
|
schema_check(json, <<"a">>),
|
||||||
|
schema_check(avro, <<"a">>),
|
||||||
|
schema_check(
|
||||||
|
protobuf,
|
||||||
|
<<"a">>,
|
||||||
|
#{<<"message_name">> => <<"a">>}
|
||||||
|
)
|
||||||
|
])
|
||||||
|
])
|
||||||
|
)},
|
||||||
|
{"duplicated serdes 1",
|
||||||
|
?_assertThrow(
|
||||||
|
{_Schema, [
|
||||||
|
#{
|
||||||
|
reason := <<"duplicated schema checks: json:a">>,
|
||||||
|
kind := validation_error,
|
||||||
|
path := "message_validation.validations.1.checks"
|
||||||
|
}
|
||||||
|
]},
|
||||||
|
parse_and_check([
|
||||||
|
validation(<<"foo">>, [
|
||||||
|
schema_check(json, <<"a">>),
|
||||||
|
schema_check(json, <<"a">>)
|
||||||
|
])
|
||||||
|
])
|
||||||
|
)},
|
||||||
|
{"duplicated serdes 2",
|
||||||
|
?_assertThrow(
|
||||||
|
{_Schema, [
|
||||||
|
#{
|
||||||
|
reason := <<"duplicated schema checks: json:a">>,
|
||||||
|
kind := validation_error,
|
||||||
|
path := "message_validation.validations.1.checks"
|
||||||
|
}
|
||||||
|
]},
|
||||||
|
parse_and_check([
|
||||||
|
validation(<<"foo">>, [
|
||||||
|
schema_check(json, <<"a">>),
|
||||||
|
sql_check(),
|
||||||
|
schema_check(json, <<"a">>)
|
||||||
|
])
|
||||||
|
])
|
||||||
|
)},
|
||||||
|
{"duplicated serdes 3",
|
||||||
|
?_assertThrow(
|
||||||
|
{_Schema, [
|
||||||
|
#{
|
||||||
|
reason := <<"duplicated schema checks: json:a">>,
|
||||||
|
kind := validation_error,
|
||||||
|
path := "message_validation.validations.1.checks"
|
||||||
|
}
|
||||||
|
]},
|
||||||
|
parse_and_check([
|
||||||
|
validation(<<"foo">>, [
|
||||||
|
schema_check(json, <<"a">>),
|
||||||
|
schema_check(json, <<"a">>),
|
||||||
|
sql_check()
|
||||||
|
])
|
||||||
|
])
|
||||||
|
)},
|
||||||
|
{"duplicated serdes 4",
|
||||||
|
?_assertThrow(
|
||||||
|
{_Schema, [
|
||||||
|
#{
|
||||||
|
reason := <<"duplicated schema checks: json:a">>,
|
||||||
|
kind := validation_error,
|
||||||
|
path := "message_validation.validations.1.checks"
|
||||||
|
}
|
||||||
|
]},
|
||||||
|
parse_and_check([
|
||||||
|
validation(<<"foo">>, [
|
||||||
|
schema_check(json, <<"a">>),
|
||||||
|
schema_check(json, <<"a">>),
|
||||||
|
schema_check(json, <<"a">>)
|
||||||
|
])
|
||||||
|
])
|
||||||
|
)},
|
||||||
|
{"duplicated serdes 4",
|
||||||
|
?_assertThrow(
|
||||||
|
{_Schema, [
|
||||||
|
#{
|
||||||
|
reason := <<"duplicated schema checks: ", _/binary>>,
|
||||||
|
kind := validation_error,
|
||||||
|
path := "message_validation.validations.1.checks"
|
||||||
|
}
|
||||||
|
]},
|
||||||
|
parse_and_check([
|
||||||
|
validation(<<"foo">>, [
|
||||||
|
schema_check(json, <<"a">>),
|
||||||
|
schema_check(json, <<"a">>),
|
||||||
|
schema_check(avro, <<"b">>),
|
||||||
|
schema_check(avro, <<"b">>)
|
||||||
|
])
|
||||||
|
])
|
||||||
|
)}
|
||||||
|
].
|
||||||
|
|
Loading…
Reference in New Issue