Merge pull request #12950 from thalesmg/fix-mv-duplicated-topics-r57-20240429
fix(message validation): validate duplicated topics
This commit is contained in:
commit
874f1f2428
|
@ -86,7 +86,7 @@ EMQX Cloud 文档:[docs.emqx.com/zh/cloud/latest/](https://docs.emqx.com/zh/cl
|
|||
|
||||
`master` 分支是最新的 5 版本,`main-v4.4` 是 4.4 版本。
|
||||
|
||||
EMQX 4.4 版本需要 OTP 24;5 版本则可以使用 OTP 24 和 25 构建。
|
||||
EMQX 4.4 版本需要 OTP 24;5 版本则可以使用 OTP 25 和 26 构建。
|
||||
|
||||
```bash
|
||||
git clone https://github.com/emqx/emqx.git
|
||||
|
|
|
@ -98,7 +98,7 @@ The `master` branch tracks the latest version 5. For version 4.4 checkout the `m
|
|||
|
||||
EMQX 4.4 requires OTP 24.
|
||||
EMQX 5.0 ~ 5.3 can be built with OTP 24 or 25.
|
||||
EMQX 5.4 and newer can be built with OTP 24 or 25.
|
||||
EMQX 5.4 and newer can be built with OTP 25 or 26.
|
||||
|
||||
```bash
|
||||
git clone https://github.com/emqx/emqx.git
|
||||
|
|
|
@ -65,6 +65,7 @@ fields(validation) ->
|
|||
#{
|
||||
desc => ?DESC("topics"),
|
||||
converter => fun ensure_array/2,
|
||||
validator => fun validate_unique_topics/1,
|
||||
required => true
|
||||
}
|
||||
)},
|
||||
|
@ -269,3 +270,23 @@ do_validate_unique_schema_checks(
|
|||
end;
|
||||
do_validate_unique_schema_checks([_Check | Rest], Seen, Duplicated) ->
|
||||
do_validate_unique_schema_checks(Rest, Seen, Duplicated).
|
||||
|
||||
validate_unique_topics(Topics) ->
|
||||
Grouped = maps:groups_from_list(
|
||||
fun(T) -> T end,
|
||||
Topics
|
||||
),
|
||||
DuplicatedMap = maps:filter(
|
||||
fun(_T, Ts) -> length(Ts) > 1 end,
|
||||
Grouped
|
||||
),
|
||||
case maps:keys(DuplicatedMap) of
|
||||
[] ->
|
||||
ok;
|
||||
Duplicated ->
|
||||
Msg = iolist_to_binary([
|
||||
<<"duplicated topics: ">>,
|
||||
lists:join(", ", Duplicated)
|
||||
]),
|
||||
{error, Msg}
|
||||
end.
|
||||
|
|
|
@ -232,6 +232,65 @@ check_test_() ->
|
|||
|
||||
duplicated_check_test_() ->
|
||||
[
|
||||
{"duplicated topics 1",
|
||||
?_assertThrow(
|
||||
{_Schema, [
|
||||
#{
|
||||
reason := <<"duplicated topics: t/1">>,
|
||||
kind := validation_error,
|
||||
path := "message_validation.validations.1.topics"
|
||||
}
|
||||
]},
|
||||
parse_and_check([
|
||||
validation(
|
||||
<<"foo">>,
|
||||
[schema_check(json, <<"a">>)],
|
||||
#{<<"topics">> => [<<"t/1">>, <<"t/1">>]}
|
||||
)
|
||||
])
|
||||
)},
|
||||
{"duplicated topics 2",
|
||||
?_assertThrow(
|
||||
{_Schema, [
|
||||
#{
|
||||
reason := <<"duplicated topics: t/1">>,
|
||||
kind := validation_error,
|
||||
path := "message_validation.validations.1.topics"
|
||||
}
|
||||
]},
|
||||
parse_and_check([
|
||||
validation(
|
||||
<<"foo">>,
|
||||
[schema_check(json, <<"a">>)],
|
||||
#{<<"topics">> => [<<"t/1">>, <<"t/#">>, <<"t/1">>]}
|
||||
)
|
||||
])
|
||||
)},
|
||||
{"duplicated topics 3",
|
||||
?_assertThrow(
|
||||
{_Schema, [
|
||||
#{
|
||||
reason := <<"duplicated topics: t/1, t/2">>,
|
||||
kind := validation_error,
|
||||
path := "message_validation.validations.1.topics"
|
||||
}
|
||||
]},
|
||||
parse_and_check([
|
||||
validation(
|
||||
<<"foo">>,
|
||||
[schema_check(json, <<"a">>)],
|
||||
#{
|
||||
<<"topics">> => [
|
||||
<<"t/1">>,
|
||||
<<"t/#">>,
|
||||
<<"t/1">>,
|
||||
<<"t/2">>,
|
||||
<<"t/2">>
|
||||
]
|
||||
}
|
||||
)
|
||||
])
|
||||
)},
|
||||
{"duplicated sql checks are not checked",
|
||||
?_assertMatch(
|
||||
[#{<<"checks">> := [_, _]}],
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
Added a validation to prevent duplicated topics when configuring a message validation.
|
Loading…
Reference in New Issue