From c871b3745320b34bffb9f7b456beb2b413a07123 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Thu, 30 May 2024 13:48:00 +0300 Subject: [PATCH] fix(clusterlink): add link topics schema validator --- .../include/emqx_cluster_link.hrl | 2 + .../src/emqx_cluster_link_schema.erl | 40 ++++++++++++++++--- 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/apps/emqx_cluster_link/include/emqx_cluster_link.hrl b/apps/emqx_cluster_link/include/emqx_cluster_link.hrl index 3ee7e9fdf..08dc7f4ad 100644 --- a/apps/emqx_cluster_link/include/emqx_cluster_link.hrl +++ b/apps/emqx_cluster_link/include/emqx_cluster_link.hrl @@ -3,6 +3,8 @@ %%-------------------------------------------------------------------- -define(TOPIC_PREFIX, "$LINK/cluster/"). +-define(TOPIC_PREFIX_WILDCARD, <>). + -define(ROUTE_TOPIC_PREFIX, ?TOPIC_PREFIX "route/"). -define(MSG_TOPIC_PREFIX, ?TOPIC_PREFIX "msg/"). -define(RESP_TOPIC_PREFIX, ?TOPIC_PREFIX "resp/"). diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl index 695b29330..22c9e31ec 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl @@ -6,6 +6,7 @@ -behaviour(emqx_schema_hooks). +-include("emqx_cluster_link.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -export([injected_fields/0]). @@ -49,13 +50,16 @@ fields("link") -> %% - basic topic validation %% - non-overlapping (not intersecting) filters? %% (this may be not required, depends on config update implementation) - {topics, ?HOCON(?ARRAY(binary()), #{required => true})}, + {topics, + ?HOCON(?ARRAY(binary()), #{required => true, validator => fun topics_validator/1})}, {pool_size, ?HOCON(pos_integer(), #{default => emqx_vm:schedulers() * 2})} ]. desc(_) -> "todo". +%% TODO: check that no link name equals local cluster name, +%% but this may be tricky since the link config is injected into cluster config (emqx_conf_schema). links_validator(Links) -> {_, Dups} = lists:foldl( fun(Link, {Acc, DupAcc}) -> @@ -70,10 +74,36 @@ links_validator(Links) -> {#{}, []}, Links ), - case Dups of - [] -> ok; - _ -> {error, #{reason => duplicated_cluster_links, names => Dups}} - end. + check_errors(Dups, duplicated_cluster_links, names). link_name(#{upstream := Name}) -> Name; link_name(#{<<"upstream">> := Name}) -> Name. + +topics_validator(Topics) -> + Errors = lists:foldl( + fun(T, ErrAcc) -> + try + _ = emqx_topic:validate(T), + validate_sys_link_topic(T, ErrAcc) + catch + E:R -> + [{T, {E, R}} | ErrAcc] + end + end, + [], + Topics + ), + check_errors(Errors, invalid_topics, topics). + +validate_sys_link_topic(T, ErrAcc) -> + case emqx_topic:match(T, ?TOPIC_PREFIX_WILDCARD) of + true -> + [{T, {error, topic_not_allowed}} | ErrAcc]; + false -> + ErrAcc + end. + +check_errors([] = _Errors, _Reason, _ValuesField) -> + ok; +check_errors(Errors, Reason, ValuesField) -> + {error, #{reason => Reason, ValuesField => Errors}}.