343 lines
12 KiB
Erlang
343 lines
12 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%--------------------------------------------------------------------
|
|
-module(emqx_message_validation_tests).
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
-define(VALIDATIONS_PATH, "message_validation.validations").
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Helper fns
|
|
%%------------------------------------------------------------------------------
|
|
|
|
parse_and_check(InnerConfigs) ->
|
|
RootBin = <<"message_validation">>,
|
|
InnerBin = <<"validations">>,
|
|
RawConf = #{RootBin => #{InnerBin => InnerConfigs}},
|
|
#{RootBin := #{InnerBin := Checked}} = hocon_tconf:check_plain(
|
|
emqx_message_validation_schema,
|
|
RawConf,
|
|
#{
|
|
required => false,
|
|
atom_key => false,
|
|
make_serializable => false
|
|
}
|
|
),
|
|
Checked.
|
|
|
|
validation(Name, Checks) ->
|
|
validation(Name, Checks, _Overrides = #{}).
|
|
|
|
validation(Name, Checks, Overrides) ->
|
|
Default = #{
|
|
<<"tags">> => [<<"some">>, <<"tags">>],
|
|
<<"description">> => <<"my validation">>,
|
|
<<"enable">> => true,
|
|
<<"name">> => Name,
|
|
<<"topics">> => <<"t/+">>,
|
|
<<"strategy">> => <<"all_pass">>,
|
|
<<"failure_action">> => <<"drop">>,
|
|
<<"log_failure">> => #{<<"level">> => <<"warning">>},
|
|
<<"checks">> => Checks
|
|
},
|
|
emqx_utils_maps:deep_merge(Default, Overrides).
|
|
|
|
sql_check() ->
|
|
sql_check(<<"select * where true">>).
|
|
|
|
sql_check(SQL) ->
|
|
#{
|
|
<<"type">> => <<"sql">>,
|
|
<<"sql">> => SQL
|
|
}.
|
|
|
|
schema_check(Type, SerdeName) ->
|
|
schema_check(Type, SerdeName, _Overrides = #{}).
|
|
|
|
schema_check(Type, SerdeName, Overrides) ->
|
|
emqx_utils_maps:deep_merge(
|
|
#{
|
|
<<"type">> => emqx_utils_conv:bin(Type),
|
|
<<"schema">> => SerdeName
|
|
},
|
|
Overrides
|
|
).
|
|
|
|
eval_sql(Message, SQL) ->
|
|
{ok, Check} = emqx_message_validation:parse_sql_check(SQL),
|
|
Validation = #{log_failure => #{level => warning}, name => <<"validation">>},
|
|
emqx_message_validation:evaluate_sql_check(Check, Validation, Message).
|
|
|
|
message() ->
|
|
message(_Opts = #{}).
|
|
|
|
message(Opts) ->
|
|
Defaults = #{
|
|
id => emqx_guid:gen(),
|
|
qos => 0,
|
|
from => emqx_guid:to_hexstr(emqx_guid:gen()),
|
|
flags => #{retain => false},
|
|
headers => #{
|
|
proto_ver => v5,
|
|
properties => #{'User-Property' => [{<<"a">>, <<"b">>}]}
|
|
},
|
|
topic => <<"t/t">>,
|
|
payload => emqx_utils_json:encode(#{value => 10}),
|
|
timestamp => 1710272561615,
|
|
extra => []
|
|
},
|
|
emqx_message:from_map(emqx_utils_maps:deep_merge(Defaults, Opts)).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Test cases
|
|
%%------------------------------------------------------------------------------
|
|
|
|
schema_test_() ->
|
|
[
|
|
{"topics is always a list 1",
|
|
?_assertMatch(
|
|
[#{<<"topics">> := [<<"t/1">>]}],
|
|
parse_and_check([
|
|
validation(
|
|
<<"foo">>,
|
|
[sql_check()],
|
|
#{<<"topics">> => <<"t/1">>}
|
|
)
|
|
])
|
|
)},
|
|
{"topics is always a list 2",
|
|
?_assertMatch(
|
|
[#{<<"topics">> := [<<"t/1">>]}],
|
|
parse_and_check([
|
|
validation(
|
|
<<"foo">>,
|
|
[sql_check()],
|
|
#{<<"topics">> => [<<"t/1">>]}
|
|
)
|
|
])
|
|
)},
|
|
{"foreach expression is not allowed",
|
|
?_assertThrow(
|
|
{_Schema, [
|
|
#{
|
|
reason := foreach_not_allowed,
|
|
kind := validation_error
|
|
}
|
|
]},
|
|
parse_and_check([
|
|
validation(
|
|
<<"foo">>,
|
|
[sql_check(<<"foreach foo as f where true">>)]
|
|
)
|
|
])
|
|
)},
|
|
{"from clause is not allowed",
|
|
?_assertThrow(
|
|
{_Schema, [
|
|
#{
|
|
reason := non_empty_from_clause,
|
|
kind := validation_error
|
|
}
|
|
]},
|
|
parse_and_check([
|
|
validation(
|
|
<<"foo">>,
|
|
[sql_check(<<"select * from t">>)]
|
|
)
|
|
])
|
|
)},
|
|
{"names are unique",
|
|
?_assertThrow(
|
|
{_Schema, [
|
|
#{
|
|
reason := <<"duplicated name:", _/binary>>,
|
|
path := ?VALIDATIONS_PATH,
|
|
kind := validation_error
|
|
}
|
|
]},
|
|
parse_and_check([
|
|
validation(<<"foo">>, [sql_check()]),
|
|
validation(<<"foo">>, [sql_check()])
|
|
])
|
|
)},
|
|
{"checks must be non-empty",
|
|
?_assertThrow(
|
|
{_Schema, [
|
|
#{
|
|
reason := "at least one check must be defined",
|
|
kind := validation_error
|
|
}
|
|
]},
|
|
parse_and_check([
|
|
validation(
|
|
<<"foo">>,
|
|
[]
|
|
)
|
|
])
|
|
)},
|
|
{"bogus check type",
|
|
?_assertThrow(
|
|
{_Schema, [
|
|
#{
|
|
expected := <<"sql", _/binary>>,
|
|
kind := validation_error,
|
|
field_name := type
|
|
}
|
|
]},
|
|
parse_and_check([validation(<<"foo">>, [#{<<"type">> => <<"foo">>}])])
|
|
)}
|
|
].
|
|
|
|
invalid_names_test_() ->
|
|
[
|
|
{InvalidName,
|
|
?_assertThrow(
|
|
{_Schema, [
|
|
#{
|
|
reason := <<"must conform to regex:", _/binary>>,
|
|
kind := validation_error,
|
|
path := "message_validation.validations.1.name"
|
|
}
|
|
]},
|
|
parse_and_check([validation(InvalidName, [sql_check()])])
|
|
)}
|
|
|| InvalidName <- [
|
|
<<"">>,
|
|
<<"_name">>,
|
|
<<"name$">>,
|
|
<<"name!">>,
|
|
<<"some name">>,
|
|
<<"nãme"/utf8>>,
|
|
<<"test_哈哈"/utf8>>
|
|
]
|
|
].
|
|
|
|
check_test_() ->
|
|
[
|
|
{"denied by payload 1",
|
|
?_assertNot(eval_sql(message(), <<"select * where payload.value > 15">>))},
|
|
{"denied by payload 2",
|
|
?_assertNot(eval_sql(message(), <<"select payload.value as x where x > 15">>))},
|
|
{"allowed by payload 1",
|
|
?_assert(eval_sql(message(), <<"select * where payload.value > 5">>))},
|
|
{"allowed by payload 2",
|
|
?_assert(eval_sql(message(), <<"select payload.value as x where x > 5">>))},
|
|
{"always passes 1", ?_assert(eval_sql(message(), <<"select * where true">>))},
|
|
{"always passes 2", ?_assert(eval_sql(message(), <<"select * where 1 = 1">>))},
|
|
{"never passes 1", ?_assertNot(eval_sql(message(), <<"select * where false">>))},
|
|
{"never passes 2", ?_assertNot(eval_sql(message(), <<"select * where 1 = 2">>))},
|
|
{"never passes 3", ?_assertNot(eval_sql(message(), <<"select * where true and false">>))}
|
|
].
|
|
|
|
duplicated_check_test_() ->
|
|
[
|
|
{"duplicated sql checks are not checked",
|
|
?_assertMatch(
|
|
[#{<<"checks">> := [_, _]}],
|
|
parse_and_check([
|
|
validation(<<"foo">>, [sql_check(), sql_check()])
|
|
])
|
|
)},
|
|
{"different serdes with same name",
|
|
?_assertMatch(
|
|
[#{<<"checks">> := [_, _, _]}],
|
|
parse_and_check([
|
|
validation(<<"foo">>, [
|
|
schema_check(json, <<"a">>),
|
|
schema_check(avro, <<"a">>),
|
|
schema_check(
|
|
protobuf,
|
|
<<"a">>,
|
|
#{<<"message_name">> => <<"a">>}
|
|
)
|
|
])
|
|
])
|
|
)},
|
|
{"duplicated serdes 1",
|
|
?_assertThrow(
|
|
{_Schema, [
|
|
#{
|
|
reason := <<"duplicated schema checks: json:a">>,
|
|
kind := validation_error,
|
|
path := "message_validation.validations.1.checks"
|
|
}
|
|
]},
|
|
parse_and_check([
|
|
validation(<<"foo">>, [
|
|
schema_check(json, <<"a">>),
|
|
schema_check(json, <<"a">>)
|
|
])
|
|
])
|
|
)},
|
|
{"duplicated serdes 2",
|
|
?_assertThrow(
|
|
{_Schema, [
|
|
#{
|
|
reason := <<"duplicated schema checks: json:a">>,
|
|
kind := validation_error,
|
|
path := "message_validation.validations.1.checks"
|
|
}
|
|
]},
|
|
parse_and_check([
|
|
validation(<<"foo">>, [
|
|
schema_check(json, <<"a">>),
|
|
sql_check(),
|
|
schema_check(json, <<"a">>)
|
|
])
|
|
])
|
|
)},
|
|
{"duplicated serdes 3",
|
|
?_assertThrow(
|
|
{_Schema, [
|
|
#{
|
|
reason := <<"duplicated schema checks: json:a">>,
|
|
kind := validation_error,
|
|
path := "message_validation.validations.1.checks"
|
|
}
|
|
]},
|
|
parse_and_check([
|
|
validation(<<"foo">>, [
|
|
schema_check(json, <<"a">>),
|
|
schema_check(json, <<"a">>),
|
|
sql_check()
|
|
])
|
|
])
|
|
)},
|
|
{"duplicated serdes 4",
|
|
?_assertThrow(
|
|
{_Schema, [
|
|
#{
|
|
reason := <<"duplicated schema checks: json:a">>,
|
|
kind := validation_error,
|
|
path := "message_validation.validations.1.checks"
|
|
}
|
|
]},
|
|
parse_and_check([
|
|
validation(<<"foo">>, [
|
|
schema_check(json, <<"a">>),
|
|
schema_check(json, <<"a">>),
|
|
schema_check(json, <<"a">>)
|
|
])
|
|
])
|
|
)},
|
|
{"duplicated serdes 4",
|
|
?_assertThrow(
|
|
{_Schema, [
|
|
#{
|
|
reason := <<"duplicated schema checks: ", _/binary>>,
|
|
kind := validation_error,
|
|
path := "message_validation.validations.1.checks"
|
|
}
|
|
]},
|
|
parse_and_check([
|
|
validation(<<"foo">>, [
|
|
schema_check(json, <<"a">>),
|
|
schema_check(json, <<"a">>),
|
|
schema_check(avro, <<"b">>),
|
|
schema_check(avro, <<"b">>)
|
|
])
|
|
])
|
|
)}
|
|
].
|