fix(clusterlink): add link topics schema validator

This commit is contained in:
Serge Tupchii 2024-05-30 13:48:00 +03:00
parent 780a0bf807
commit c871b37453
2 changed files with 37 additions and 5 deletions

View File

@ -3,6 +3,8 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-define(TOPIC_PREFIX, "$LINK/cluster/"). -define(TOPIC_PREFIX, "$LINK/cluster/").
-define(TOPIC_PREFIX_WILDCARD, <<?TOPIC_PREFIX "#">>).
-define(ROUTE_TOPIC_PREFIX, ?TOPIC_PREFIX "route/"). -define(ROUTE_TOPIC_PREFIX, ?TOPIC_PREFIX "route/").
-define(MSG_TOPIC_PREFIX, ?TOPIC_PREFIX "msg/"). -define(MSG_TOPIC_PREFIX, ?TOPIC_PREFIX "msg/").
-define(RESP_TOPIC_PREFIX, ?TOPIC_PREFIX "resp/"). -define(RESP_TOPIC_PREFIX, ?TOPIC_PREFIX "resp/").

View File

@ -6,6 +6,7 @@
-behaviour(emqx_schema_hooks). -behaviour(emqx_schema_hooks).
-include("emqx_cluster_link.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-export([injected_fields/0]). -export([injected_fields/0]).
@ -49,13 +50,16 @@ fields("link") ->
%% - basic topic validation %% - basic topic validation
%% - non-overlapping (not intersecting) filters? %% - non-overlapping (not intersecting) filters?
%% (this may be not required, depends on config update implementation) %% (this may be not required, depends on config update implementation)
{topics, ?HOCON(?ARRAY(binary()), #{required => true})}, {topics,
?HOCON(?ARRAY(binary()), #{required => true, validator => fun topics_validator/1})},
{pool_size, ?HOCON(pos_integer(), #{default => emqx_vm:schedulers() * 2})} {pool_size, ?HOCON(pos_integer(), #{default => emqx_vm:schedulers() * 2})}
]. ].
desc(_) -> desc(_) ->
"todo". "todo".
%% TODO: check that no link name equals local cluster name,
%% but this may be tricky since the link config is injected into cluster config (emqx_conf_schema).
links_validator(Links) -> links_validator(Links) ->
{_, Dups} = lists:foldl( {_, Dups} = lists:foldl(
fun(Link, {Acc, DupAcc}) -> fun(Link, {Acc, DupAcc}) ->
@ -70,10 +74,36 @@ links_validator(Links) ->
{#{}, []}, {#{}, []},
Links Links
), ),
case Dups of check_errors(Dups, duplicated_cluster_links, names).
[] -> ok;
_ -> {error, #{reason => duplicated_cluster_links, names => Dups}}
end.
link_name(#{upstream := Name}) -> Name; link_name(#{upstream := Name}) -> Name;
link_name(#{<<"upstream">> := Name}) -> Name. link_name(#{<<"upstream">> := Name}) -> Name.
topics_validator(Topics) ->
Errors = lists:foldl(
fun(T, ErrAcc) ->
try
_ = emqx_topic:validate(T),
validate_sys_link_topic(T, ErrAcc)
catch
E:R ->
[{T, {E, R}} | ErrAcc]
end
end,
[],
Topics
),
check_errors(Errors, invalid_topics, topics).
validate_sys_link_topic(T, ErrAcc) ->
case emqx_topic:match(T, ?TOPIC_PREFIX_WILDCARD) of
true ->
[{T, {error, topic_not_allowed}} | ErrAcc];
false ->
ErrAcc
end.
check_errors([] = _Errors, _Reason, _ValuesField) ->
ok;
check_errors(Errors, Reason, ValuesField) ->
{error, #{reason => Reason, ValuesField => Errors}}.