chore(clusterlink): rename link `upstream` field to `name`

This commit is contained in:
Serge Tupchii 2024-06-14 19:48:40 +03:00
parent a95a08efd3
commit a905a6048c
11 changed files with 42 additions and 49 deletions

View File

@ -144,7 +144,7 @@ maybe_push_route_op(Op, Topic, RouteID) ->
maybe_push_route_op(Op, Topic, RouteID, PushFun) -> maybe_push_route_op(Op, Topic, RouteID, PushFun) ->
lists:foreach( lists:foreach(
fun(#{upstream := Cluster, topics := LinkFilters}) -> fun(#{name := Cluster, topics := LinkFilters}) ->
case topic_intersect_any(Topic, LinkFilters) of case topic_intersect_any(Topic, LinkFilters) of
false -> false ->
ok; ok;

View File

@ -99,7 +99,7 @@ links_config_example() ->
<<"t/topic-example">>, <<"t/topic-example">>,
<<"t/topic-filter-example/1/#">> <<"t/topic-filter-example/1/#">>
], ],
<<"upstream">> => <<"emqxcl_b">> <<"name">> => <<"emqxcl_b">>
}, },
#{ #{
<<"enable">> => true, <<"enable">> => true,
@ -111,6 +111,6 @@ links_config_example() ->
<<"t/topic-example">>, <<"t/topic-example">>,
<<"t/topic-filter-example/1/#">> <<"t/topic-filter-example/1/#">>
], ],
<<"upstream">> => <<"emqxcl_c">> <<"name">> => <<"emqxcl_c">>
} }
]. ].

View File

@ -48,7 +48,7 @@ start_msg_fwd_resources(LinksConf) ->
remove_msg_fwd_resources(LinksConf) -> remove_msg_fwd_resources(LinksConf) ->
lists:foreach( lists:foreach(
fun(#{upstream := Name}) -> fun(#{name := Name}) ->
emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name) emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name)
end, end,
LinksConf LinksConf

View File

@ -193,7 +193,7 @@ add_link(_DisabledLinkConf) ->
ok. ok.
remove_links(LinksConf) -> remove_links(LinksConf) ->
[remove_link(Name) || #{upstream := Name} <- LinksConf]. [remove_link(Name) || #{name := Name} <- LinksConf].
remove_link(Name) -> remove_link(Name) ->
_ = emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name), _ = emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name),
@ -202,7 +202,7 @@ remove_link(Name) ->
update_links(LinksConf) -> update_links(LinksConf) ->
[update_link(Link) || Link <- LinksConf]. [update_link(Link) || Link <- LinksConf].
update_link({OldLinkConf, #{enable := true, upstream := Name} = NewLinkConf}) -> update_link({OldLinkConf, #{enable := true, name := Name} = NewLinkConf}) ->
case what_is_changed(OldLinkConf, NewLinkConf) of case what_is_changed(OldLinkConf, NewLinkConf) of
both -> both ->
_ = ensure_actor_stopped(Name), _ = ensure_actor_stopped(Name),
@ -215,7 +215,7 @@ update_link({OldLinkConf, #{enable := true, upstream := Name} = NewLinkConf}) ->
msg_resource -> msg_resource ->
ok = update_msg_fwd_resource(OldLinkConf, NewLinkConf) ok = update_msg_fwd_resource(OldLinkConf, NewLinkConf)
end; end;
update_link({_OldLinkConf, #{enable := false, upstream := Name} = _NewLinkConf}) -> update_link({_OldLinkConf, #{enable := false, name := Name} = _NewLinkConf}) ->
_ = emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name), _ = emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name),
ensure_actor_stopped(Name). ensure_actor_stopped(Name).
@ -240,7 +240,7 @@ what_is_changed(OldLink, NewLink) ->
are_fields_changed(Fields, OldLink, NewLink) -> are_fields_changed(Fields, OldLink, NewLink) ->
maps:with(Fields, OldLink) =/= maps:with(Fields, NewLink). maps:with(Fields, OldLink) =/= maps:with(Fields, NewLink).
update_msg_fwd_resource(_, #{upstream := Name} = NewConf) -> update_msg_fwd_resource(_, #{name := Name} = NewConf) ->
_ = emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name), _ = emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name),
{ok, _} = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(NewConf), {ok, _} = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(NewConf),
ok. ok.
@ -248,8 +248,8 @@ update_msg_fwd_resource(_, #{upstream := Name} = NewConf) ->
ensure_actor_stopped(ClusterName) -> ensure_actor_stopped(ClusterName) ->
emqx_cluster_link_sup:ensure_actor_stopped(ClusterName). emqx_cluster_link_sup:ensure_actor_stopped(ClusterName).
upstream_name(#{upstream := N}) -> N; upstream_name(#{name := N}) -> N;
upstream_name(#{<<"upstream">> := N}) -> N. upstream_name(#{<<"name">> := N}) -> N.
maybe_increment_ps_actor_incr(New, Old) -> maybe_increment_ps_actor_incr(New, Old) ->
case emqx_persistent_message:is_persistence_enabled() of case emqx_persistent_message:is_persistence_enabled() of
@ -284,9 +284,9 @@ increment_ps_actor_incr(#{ps_actor_incarnation := Incr} = Conf) ->
increment_ps_actor_incr(#{<<"ps_actor_incarnation">> := Incr} = Conf) -> increment_ps_actor_incr(#{<<"ps_actor_incarnation">> := Incr} = Conf) ->
Conf#{<<"ps_actor_incarnation">> => Incr + 1}; Conf#{<<"ps_actor_incarnation">> => Incr + 1};
%% Default value set in schema is 0, so need to set it to 1 during the first update. %% Default value set in schema is 0, so need to set it to 1 during the first update.
increment_ps_actor_incr(#{<<"upstream">> := _} = Conf) -> increment_ps_actor_incr(#{<<"name">> := _} = Conf) ->
Conf#{<<"ps_actor_incarnation">> => 1}; Conf#{<<"ps_actor_incarnation">> => 1};
increment_ps_actor_incr(#{upstream := _} = Conf) -> increment_ps_actor_incr(#{name := _} = Conf) ->
Conf#{ps_actor_incarnation => 1}. Conf#{ps_actor_incarnation => 1}.
convert_certs(LinksConf) -> convert_certs(LinksConf) ->

View File

@ -80,16 +80,9 @@
-define(PUB_TIMEOUT, 10_000). -define(PUB_TIMEOUT, 10_000).
-spec ensure_msg_fwd_resource(binary() | map()) -> -spec ensure_msg_fwd_resource(map()) ->
{ok, emqx_resource:resource_data() | already_started} | {error, Reason :: term()}. {ok, emqx_resource:resource_data() | already_started} | {error, Reason :: term()}.
ensure_msg_fwd_resource(ClusterName) when is_binary(ClusterName) -> ensure_msg_fwd_resource(#{name := Name, resource_opts := ResOpts} = ClusterConf) ->
case emqx_cluster_link_config:link(ClusterName) of
#{} = Conf ->
ensure_msg_fwd_resource(Conf);
undefined ->
{error, link_config_not_found}
end;
ensure_msg_fwd_resource(#{upstream := Name, resource_opts := ResOpts} = ClusterConf) ->
ResOpts1 = ResOpts#{ ResOpts1 = ResOpts#{
query_mode => async, query_mode => async,
start_after_created => true start_after_created => true

View File

@ -144,7 +144,7 @@ status(Cluster) ->
%% 1. Actor + MQTT Client %% 1. Actor + MQTT Client
%% 2. Syncer %% 2. Syncer
start_link(#{upstream := TargetCluster} = LinkConf) -> start_link(#{name := TargetCluster} = LinkConf) ->
supervisor:start_link(?REF(TargetCluster), ?MODULE, {sup, LinkConf}). supervisor:start_link(?REF(TargetCluster), ?MODULE, {sup, LinkConf}).
%% Actor %% Actor
@ -290,7 +290,7 @@ init({sup, LinkConf}) ->
init({actor, State}) -> init({actor, State}) ->
init_actor(State). init_actor(State).
child_spec(actor, #{upstream := TargetCluster} = LinkConf) -> child_spec(actor, #{name := TargetCluster} = LinkConf) ->
%% Actor process. %% Actor process.
%% Wraps MQTT Client process. %% Wraps MQTT Client process.
%% ClientID: `mycluster:emqx1@emqx.local:routesync` %% ClientID: `mycluster:emqx1@emqx.local:routesync`
@ -299,7 +299,7 @@ child_spec(actor, #{upstream := TargetCluster} = LinkConf) ->
Actor = get_actor_id(), Actor = get_actor_id(),
Incarnation = new_incarnation(), Incarnation = new_incarnation(),
actor_spec(actor, ?ACTOR_REF(TargetCluster), Actor, Incarnation, LinkConf); actor_spec(actor, ?ACTOR_REF(TargetCluster), Actor, Incarnation, LinkConf);
child_spec(ps_actor, #{upstream := TargetCluster, ps_actor_incarnation := Incr} = LinkConf) -> child_spec(ps_actor, #{name := TargetCluster, ps_actor_incarnation := Incr} = LinkConf) ->
actor_spec(ps_actor, ?PS_ACTOR_REF(TargetCluster), ?PS_ACTOR, Incr, LinkConf). actor_spec(ps_actor, ?PS_ACTOR_REF(TargetCluster), ?PS_ACTOR, Incr, LinkConf).
child_spec(syncer, ?PS_ACTOR, Incarnation, TargetCluster) -> child_spec(syncer, ?PS_ACTOR, Incarnation, TargetCluster) ->
@ -331,7 +331,7 @@ syncer_spec(ChildID, Actor, Incarnation, SyncerRef, ClientName) ->
type => worker type => worker
}. }.
mk_state(#{upstream := TargetCluster} = LinkConf, Actor, Incarnation) -> mk_state(#{name := TargetCluster} = LinkConf, Actor, Incarnation) ->
#st{ #st{
target = TargetCluster, target = TargetCluster,
actor = Actor, actor = Actor,

View File

@ -40,7 +40,7 @@ links_schema(Meta) ->
fields("link") -> fields("link") ->
[ [
{enable, ?HOCON(boolean(), #{default => true, desc => ?DESC(enable)})}, {enable, ?HOCON(boolean(), #{default => true, desc => ?DESC(enable)})},
{upstream, ?HOCON(binary(), #{required => true, desc => ?DESC(upstream)})}, {name, ?HOCON(binary(), #{required => true, desc => ?DESC(link_name)})},
{server, {server,
emqx_schema:servers_sc(#{required => true, desc => ?DESC(server)}, ?MQTT_HOST_OPTS)}, emqx_schema:servers_sc(#{required => true, desc => ?DESC(server)}, ?MQTT_HOST_OPTS)},
{clientid, ?HOCON(binary(), #{desc => ?DESC(clientid)})}, {clientid, ?HOCON(binary(), #{desc => ?DESC(clientid)})},
@ -121,8 +121,8 @@ links_validator(Links) ->
), ),
check_errors(Dups, duplicated_cluster_links, names). check_errors(Dups, duplicated_cluster_links, names).
link_name(#{upstream := Name}) -> Name; link_name(#{name := Name}) -> Name;
link_name(#{<<"upstream">> := Name}) -> Name. link_name(#{<<"name">> := Name}) -> Name.
topics_validator(Topics) -> topics_validator(Topics) ->
Errors = lists:foldl( Errors = lists:foldl(

View File

@ -30,7 +30,7 @@ init(LinksConf) ->
ExtrouterGC = extrouter_gc_spec(), ExtrouterGC = extrouter_gc_spec(),
RouteActors = [ RouteActors = [
sup_spec(Name, ?ACTOR_MODULE, [LinkConf]) sup_spec(Name, ?ACTOR_MODULE, [LinkConf])
|| #{upstream := Name} = LinkConf <- LinksConf || #{name := Name} = LinkConf <- LinksConf
], ],
{ok, {SupFlags, [ExtrouterGC | RouteActors]}}. {ok, {SupFlags, [ExtrouterGC | RouteActors]}}.
@ -53,7 +53,7 @@ sup_spec(Id, Mod, Args) ->
modules => [Mod] modules => [Mod]
}. }.
ensure_actor(#{upstream := Name} = LinkConf) -> ensure_actor(#{name := Name} = LinkConf) ->
case supervisor:start_child(?SERVER, sup_spec(Name, ?ACTOR_MODULE, [LinkConf])) of case supervisor:start_child(?SERVER, sup_spec(Name, ?ACTOR_MODULE, [LinkConf])) of
{ok, Pid} -> {ok, Pid} ->
{ok, Pid}; {ok, Pid};

View File

@ -55,7 +55,7 @@ mk_source_cluster(BaseName, Config) ->
"\n name = cl.source" "\n name = cl.source"
"\n links = [" "\n links = ["
"\n { enable = true" "\n { enable = true"
"\n upstream = cl.target" "\n name = cl.target"
"\n server = \"localhost:31883\"" "\n server = \"localhost:31883\""
"\n clientid = client.source" "\n clientid = client.source"
"\n topics = []" "\n topics = []"
@ -77,7 +77,7 @@ mk_target_cluster(BaseName, Config) ->
"\n name = cl.target" "\n name = cl.target"
"\n links = [" "\n links = ["
"\n { enable = true" "\n { enable = true"
"\n upstream = cl.source" "\n name = cl.source"
"\n server = \"localhost:41883\"" "\n server = \"localhost:41883\""
"\n clientid = client.target" "\n clientid = client.target"
"\n topics = [\"#\"]" "\n topics = [\"#\"]"

View File

@ -101,14 +101,14 @@ t_config_update(Config) ->
<<"pool_size">> => 1, <<"pool_size">> => 1,
<<"server">> => <<"localhost:", (integer_to_binary(LPortB))/binary>>, <<"server">> => <<"localhost:", (integer_to_binary(LPortB))/binary>>,
<<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>], <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
<<"upstream">> => NameB <<"name">> => NameB
}, },
LinkConfB = #{ LinkConfB = #{
<<"enable">> => true, <<"enable">> => true,
<<"pool_size">> => 1, <<"pool_size">> => 1,
<<"server">> => <<"localhost:", (integer_to_binary(LPortA))/binary>>, <<"server">> => <<"localhost:", (integer_to_binary(LPortA))/binary>>,
<<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>], <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
<<"upstream">> => NameA <<"name">> => NameA
}, },
{ok, SubRef} = snabbkaffe:subscribe( {ok, SubRef} = snabbkaffe:subscribe(
@ -242,7 +242,7 @@ t_config_validations(Config) ->
<<"pool_size">> => 1, <<"pool_size">> => 1,
<<"server">> => <<"localhost:", (integer_to_binary(LPortB))/binary>>, <<"server">> => <<"localhost:", (integer_to_binary(LPortB))/binary>>,
<<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>], <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
<<"upstream">> => NameB <<"name">> => NameB
}, },
DuplicatedLinks = [LinkConfA, LinkConfA#{<<"enable">> => false, <<"pool_size">> => 2}], DuplicatedLinks = [LinkConfA, LinkConfA#{<<"enable">> => false, <<"pool_size">> => 2}],
?assertMatch( ?assertMatch(
@ -267,7 +267,7 @@ t_config_validations(Config) ->
?assertMatch( ?assertMatch(
{error, #{reason := required_field}}, {error, #{reason := required_field}},
erpc:call(NodeA, emqx_cluster_link_config, update, [ erpc:call(NodeA, emqx_cluster_link_config, update, [
[maps:remove(<<"upstream">>, LinkConfA)] [maps:remove(<<"name">>, LinkConfA)]
]) ])
), ),
?assertMatch( ?assertMatch(
@ -285,7 +285,7 @@ t_config_validations(Config) ->
erpc:call(NodeA, emqx_cluster_link_config, update, [[LinkConfA]]) erpc:call(NodeA, emqx_cluster_link_config, update, [[LinkConfA]])
), ),
LinkConfUnknown = LinkConfA#{ LinkConfUnknown = LinkConfA#{
<<"upstream">> => <<"no-cluster">>, <<"server">> => <<"no-cluster.emqx:31883">> <<"name">> => <<"no-cluster">>, <<"server">> => <<"no-cluster.emqx:31883">>
}, },
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
@ -365,14 +365,14 @@ t_config_update_ds(Config) ->
<<"pool_size">> => 1, <<"pool_size">> => 1,
<<"server">> => <<"localhost:", (integer_to_binary(LPortB))/binary>>, <<"server">> => <<"localhost:", (integer_to_binary(LPortB))/binary>>,
<<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>], <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
<<"upstream">> => NameB <<"name">> => NameB
}, },
LinkConfB = #{ LinkConfB = #{
<<"enable">> => true, <<"enable">> => true,
<<"pool_size">> => 1, <<"pool_size">> => 1,
<<"server">> => <<"localhost:", (integer_to_binary(LPortA))/binary>>, <<"server">> => <<"localhost:", (integer_to_binary(LPortA))/binary>>,
<<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>], <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
<<"upstream">> => NameA <<"name">> => NameA
}, },
{ok, SubRef} = snabbkaffe:subscribe( {ok, SubRef} = snabbkaffe:subscribe(
@ -500,14 +500,14 @@ t_misconfigured_links(Config) ->
<<"pool_size">> => 1, <<"pool_size">> => 1,
<<"server">> => <<"localhost:", (integer_to_binary(LPortB))/binary>>, <<"server">> => <<"localhost:", (integer_to_binary(LPortB))/binary>>,
<<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>], <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
<<"upstream">> => <<"bad-b-name">> <<"name">> => <<"bad-b-name">>
}, },
LinkConfB = #{ LinkConfB = #{
<<"enable">> => true, <<"enable">> => true,
<<"pool_size">> => 1, <<"pool_size">> => 1,
<<"server">> => <<"localhost:", (integer_to_binary(LPortA))/binary>>, <<"server">> => <<"localhost:", (integer_to_binary(LPortA))/binary>>,
<<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>], <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
<<"upstream">> => NameA <<"name">> => NameA
}, },
?assertMatch({ok, _}, erpc:call(NodeB1, emqx_cluster_link_config, update, [[LinkConfB]])), ?assertMatch({ok, _}, erpc:call(NodeB1, emqx_cluster_link_config, update, [[LinkConfB]])),
@ -528,7 +528,7 @@ t_misconfigured_links(Config) ->
), ),
{{ok, _}, {ok, _}} = ?wait_async_action( {{ok, _}, {ok, _}} = ?wait_async_action(
erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA#{<<"upstream">> => NameB}]]), erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA#{<<"name">> => NameB}]]),
#{ #{
?snk_kind := clink_route_bootstrap_complete, ?snk_kind := clink_route_bootstrap_complete,
?snk_meta := #{node := NodeA1} ?snk_meta := #{node := NodeA1}
@ -554,7 +554,7 @@ t_misconfigured_links(Config) ->
LinkConfB#{<<"enable">> => false}, LinkConfB#{<<"enable">> => false},
%% An extra dummy link to keep B hook/external_broker registered and be able to %% An extra dummy link to keep B hook/external_broker registered and be able to
%% respond with "link disabled error" for the first disabled link %% respond with "link disabled error" for the first disabled link
LinkConfB#{<<"upstream">> => <<"bad-a-name">>} LinkConfB#{<<"name">> => <<"bad-a-name">>}
] ]
] ]
) )
@ -562,7 +562,7 @@ t_misconfigured_links(Config) ->
?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[]])), ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[]])),
{{ok, _}, {ok, _}} = ?wait_async_action( {{ok, _}, {ok, _}} = ?wait_async_action(
erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA#{<<"upstream">> => NameB}]]), erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA#{<<"name">> => NameB}]]),
#{ #{
?snk_kind := clink_handshake_error, ?snk_kind := clink_handshake_error,
reason := <<"cluster_link_disabled">>, reason := <<"cluster_link_disabled">>,
@ -579,13 +579,13 @@ t_misconfigured_links(Config) ->
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
erpc:call(NodeB1, emqx_cluster_link_config, update, [ erpc:call(NodeB1, emqx_cluster_link_config, update, [
[LinkConfB#{<<"upstream">> => <<"bad-a-name">>}] [LinkConfB#{<<"name">> => <<"bad-a-name">>}]
]) ])
), ),
?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[]])), ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[]])),
{{ok, _}, {ok, _}} = ?wait_async_action( {{ok, _}, {ok, _}} = ?wait_async_action(
erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA#{<<"upstream">> => NameB}]]), erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA#{<<"name">> => NameB}]]),
#{ #{
?snk_kind := clink_handshake_error, ?snk_kind := clink_handshake_error,
reason := <<"unknown_cluster">>, reason := <<"unknown_cluster">>,

View File

@ -12,9 +12,9 @@ enable.desc:
"""Enable or disable a cluster link. The link is enabled by default, disabling it allows stopping the link without removing its configuration. The link must be enabled on both sides to be operational. Disabling the link should also be done on both clusters in order to free up all associated resources.""" """Enable or disable a cluster link. The link is enabled by default, disabling it allows stopping the link without removing its configuration. The link must be enabled on both sides to be operational. Disabling the link should also be done on both clusters in order to free up all associated resources."""
enable.label: "Enable" enable.label: "Enable"
upstream.desc: link_name.desc:
"""Upstream cluster name. Must be exactly equal to the value of `cluster.name` configured at the remote cluster. Must not be equal to the local cluster.name. All configured cluster link upstream names must be unique.""" """Linked (remote) cluster name. Must be exactly equal to the value of `cluster.name` configured at the remote cluster. Must not be equal to the local cluster.name. All configured cluster link names must be unique."""
upstream.label: "Upstream Name" link_name.label: "Linked Cluster Name"
server.desc: server.desc:
"""MQTT host and port of the remote EMQX broker.""" """MQTT host and port of the remote EMQX broker."""