fix(clusterlink): valide config to disallow duplicated cluster links

This commit is contained in:
Serge Tupchii 2024-05-29 13:37:06 +03:00
parent 54d51d0982
commit 58eaf07627
2 changed files with 26 additions and 4 deletions

View File

@ -169,7 +169,6 @@ remove_link(_LinkConf) ->
update_links(LinksConf) -> update_links(LinksConf) ->
[update_link(Link) || Link <- LinksConf]. [update_link(Link) || Link <- LinksConf].
%% TODO: do some updates without restart (at least without coordinator restart and re-election)
update_link(#{enabled := true} = LinkConf) -> update_link(#{enabled := true} = LinkConf) ->
_ = remove_link(LinkConf), _ = remove_link(LinkConf),
add_link(LinkConf); add_link(LinkConf);

View File

@ -28,8 +28,8 @@ injected_fields() ->
fields("cluster_linking") -> fields("cluster_linking") ->
[ [
%% TODO: validate and ensure upstream names are unique! {links,
{links, ?HOCON(?ARRAY(?R_REF("link")), #{default => []})} ?HOCON(?ARRAY(?R_REF("link")), #{default => [], validator => fun links_validator/1})}
]; ];
fields("link") -> fields("link") ->
[ [
@ -47,10 +47,33 @@ fields("link") ->
}}, }},
%% TODO: validate topics: %% TODO: validate topics:
%% - basic topic validation %% - basic topic validation
%% - non-overlapping (not intersecting) filters ? %% - non-overlapping (not intersecting) filters?
%% (this may be not required, depends on config update implementation)
{topics, ?HOCON(?ARRAY(binary()), #{required => true})}, {topics, ?HOCON(?ARRAY(binary()), #{required => true})},
{pool_size, ?HOCON(pos_integer(), #{default => emqx_vm:schedulers() * 2})} {pool_size, ?HOCON(pos_integer(), #{default => emqx_vm:schedulers() * 2})}
]. ].
desc(_) -> desc(_) ->
"todo". "todo".
links_validator(Links) ->
{_, Dups} = lists:foldl(
fun(Link, {Acc, DupAcc}) ->
Name = link_name(Link),
case Acc of
#{Name := _} ->
{Acc, [Name | DupAcc]};
_ ->
{Acc#{Name => undefined}, DupAcc}
end
end,
{#{}, []},
Links
),
case Dups of
[] -> ok;
_ -> {error, #{reason => duplicated_cluster_links, names => Dups}}
end.
link_name(#{upstream := Name}) -> Name;
link_name(#{<<"upstream">> := Name}) -> Name.