diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link.erl b/apps/emqx_cluster_link/src/emqx_cluster_link.erl index d91b33c3a..76228c052 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link.erl @@ -144,7 +144,7 @@ maybe_push_route_op(Op, Topic, RouteID) -> maybe_push_route_op(Op, Topic, RouteID, PushFun) -> lists:foreach( - fun(#{upstream := Cluster, topics := LinkFilters}) -> + fun(#{name := Cluster, topics := LinkFilters}) -> case topic_intersect_any(Topic, LinkFilters) of false -> ok; diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl index c74d2d3f7..33634607e 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl @@ -99,7 +99,7 @@ links_config_example() -> <<"t/topic-example">>, <<"t/topic-filter-example/1/#">> ], - <<"upstream">> => <<"emqxcl_b">> + <<"name">> => <<"emqxcl_b">> }, #{ <<"enable">> => true, @@ -111,6 +111,6 @@ links_config_example() -> <<"t/topic-example">>, <<"t/topic-filter-example/1/#">> ], - <<"upstream">> => <<"emqxcl_c">> + <<"name">> => <<"emqxcl_c">> } ]. diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_app.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_app.erl index ddf3028a2..41f1a0a77 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_app.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_app.erl @@ -48,7 +48,7 @@ start_msg_fwd_resources(LinksConf) -> remove_msg_fwd_resources(LinksConf) -> lists:foreach( - fun(#{upstream := Name}) -> + fun(#{name := Name}) -> emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name) end, LinksConf diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl index 2b5dea2e8..f27c7702e 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl @@ -193,7 +193,7 @@ add_link(_DisabledLinkConf) -> ok. remove_links(LinksConf) -> - [remove_link(Name) || #{upstream := Name} <- LinksConf]. + [remove_link(Name) || #{name := Name} <- LinksConf]. remove_link(Name) -> _ = emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name), @@ -202,7 +202,7 @@ remove_link(Name) -> update_links(LinksConf) -> [update_link(Link) || Link <- LinksConf]. -update_link({OldLinkConf, #{enable := true, upstream := Name} = NewLinkConf}) -> +update_link({OldLinkConf, #{enable := true, name := Name} = NewLinkConf}) -> case what_is_changed(OldLinkConf, NewLinkConf) of both -> _ = ensure_actor_stopped(Name), @@ -215,7 +215,7 @@ update_link({OldLinkConf, #{enable := true, upstream := Name} = NewLinkConf}) -> msg_resource -> ok = update_msg_fwd_resource(OldLinkConf, NewLinkConf) end; -update_link({_OldLinkConf, #{enable := false, upstream := Name} = _NewLinkConf}) -> +update_link({_OldLinkConf, #{enable := false, name := Name} = _NewLinkConf}) -> _ = emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name), ensure_actor_stopped(Name). @@ -240,7 +240,7 @@ what_is_changed(OldLink, NewLink) -> are_fields_changed(Fields, OldLink, NewLink) -> maps:with(Fields, OldLink) =/= maps:with(Fields, NewLink). -update_msg_fwd_resource(_, #{upstream := Name} = NewConf) -> +update_msg_fwd_resource(_, #{name := Name} = NewConf) -> _ = emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name), {ok, _} = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(NewConf), ok. @@ -248,8 +248,8 @@ update_msg_fwd_resource(_, #{upstream := Name} = NewConf) -> ensure_actor_stopped(ClusterName) -> emqx_cluster_link_sup:ensure_actor_stopped(ClusterName). -upstream_name(#{upstream := N}) -> N; -upstream_name(#{<<"upstream">> := N}) -> N. +upstream_name(#{name := N}) -> N; +upstream_name(#{<<"name">> := N}) -> N. maybe_increment_ps_actor_incr(New, Old) -> case emqx_persistent_message:is_persistence_enabled() of @@ -284,9 +284,9 @@ increment_ps_actor_incr(#{ps_actor_incarnation := Incr} = Conf) -> increment_ps_actor_incr(#{<<"ps_actor_incarnation">> := Incr} = Conf) -> Conf#{<<"ps_actor_incarnation">> => Incr + 1}; %% Default value set in schema is 0, so need to set it to 1 during the first update. -increment_ps_actor_incr(#{<<"upstream">> := _} = Conf) -> +increment_ps_actor_incr(#{<<"name">> := _} = Conf) -> Conf#{<<"ps_actor_incarnation">> => 1}; -increment_ps_actor_incr(#{upstream := _} = Conf) -> +increment_ps_actor_incr(#{name := _} = Conf) -> Conf#{ps_actor_incarnation => 1}. convert_certs(LinksConf) -> diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl index 7a8bf1dff..5185803b6 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl @@ -80,16 +80,9 @@ -define(PUB_TIMEOUT, 10_000). --spec ensure_msg_fwd_resource(binary() | map()) -> +-spec ensure_msg_fwd_resource(map()) -> {ok, emqx_resource:resource_data() | already_started} | {error, Reason :: term()}. -ensure_msg_fwd_resource(ClusterName) when is_binary(ClusterName) -> - case emqx_cluster_link_config:link(ClusterName) of - #{} = Conf -> - ensure_msg_fwd_resource(Conf); - undefined -> - {error, link_config_not_found} - end; -ensure_msg_fwd_resource(#{upstream := Name, resource_opts := ResOpts} = ClusterConf) -> +ensure_msg_fwd_resource(#{name := Name, resource_opts := ResOpts} = ClusterConf) -> ResOpts1 = ResOpts#{ query_mode => async, start_after_created => true diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl index b7d165419..6808da0bd 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl @@ -144,7 +144,7 @@ status(Cluster) -> %% 1. Actor + MQTT Client %% 2. Syncer -start_link(#{upstream := TargetCluster} = LinkConf) -> +start_link(#{name := TargetCluster} = LinkConf) -> supervisor:start_link(?REF(TargetCluster), ?MODULE, {sup, LinkConf}). %% Actor @@ -290,7 +290,7 @@ init({sup, LinkConf}) -> init({actor, State}) -> init_actor(State). -child_spec(actor, #{upstream := TargetCluster} = LinkConf) -> +child_spec(actor, #{name := TargetCluster} = LinkConf) -> %% Actor process. %% Wraps MQTT Client process. %% ClientID: `mycluster:emqx1@emqx.local:routesync` @@ -299,7 +299,7 @@ child_spec(actor, #{upstream := TargetCluster} = LinkConf) -> Actor = get_actor_id(), Incarnation = new_incarnation(), actor_spec(actor, ?ACTOR_REF(TargetCluster), Actor, Incarnation, LinkConf); -child_spec(ps_actor, #{upstream := TargetCluster, ps_actor_incarnation := Incr} = LinkConf) -> +child_spec(ps_actor, #{name := TargetCluster, ps_actor_incarnation := Incr} = LinkConf) -> actor_spec(ps_actor, ?PS_ACTOR_REF(TargetCluster), ?PS_ACTOR, Incr, LinkConf). child_spec(syncer, ?PS_ACTOR, Incarnation, TargetCluster) -> @@ -331,7 +331,7 @@ syncer_spec(ChildID, Actor, Incarnation, SyncerRef, ClientName) -> type => worker }. -mk_state(#{upstream := TargetCluster} = LinkConf, Actor, Incarnation) -> +mk_state(#{name := TargetCluster} = LinkConf, Actor, Incarnation) -> #st{ target = TargetCluster, actor = Actor, diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl index b6d0fbcda..f46249a4f 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl @@ -40,7 +40,7 @@ links_schema(Meta) -> fields("link") -> [ {enable, ?HOCON(boolean(), #{default => true, desc => ?DESC(enable)})}, - {upstream, ?HOCON(binary(), #{required => true, desc => ?DESC(upstream)})}, + {name, ?HOCON(binary(), #{required => true, desc => ?DESC(link_name)})}, {server, emqx_schema:servers_sc(#{required => true, desc => ?DESC(server)}, ?MQTT_HOST_OPTS)}, {clientid, ?HOCON(binary(), #{desc => ?DESC(clientid)})}, @@ -121,8 +121,8 @@ links_validator(Links) -> ), check_errors(Dups, duplicated_cluster_links, names). -link_name(#{upstream := Name}) -> Name; -link_name(#{<<"upstream">> := Name}) -> Name. +link_name(#{name := Name}) -> Name; +link_name(#{<<"name">> := Name}) -> Name. topics_validator(Topics) -> Errors = lists:foldl( diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl index 0991583e2..2025510fc 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl @@ -30,7 +30,7 @@ init(LinksConf) -> ExtrouterGC = extrouter_gc_spec(), RouteActors = [ sup_spec(Name, ?ACTOR_MODULE, [LinkConf]) - || #{upstream := Name} = LinkConf <- LinksConf + || #{name := Name} = LinkConf <- LinksConf ], {ok, {SupFlags, [ExtrouterGC | RouteActors]}}. @@ -53,7 +53,7 @@ sup_spec(Id, Mod, Args) -> modules => [Mod] }. -ensure_actor(#{upstream := Name} = LinkConf) -> +ensure_actor(#{name := Name} = LinkConf) -> case supervisor:start_child(?SERVER, sup_spec(Name, ?ACTOR_MODULE, [LinkConf])) of {ok, Pid} -> {ok, Pid}; diff --git a/apps/emqx_cluster_link/test/emqx_cluster_link_SUITE.erl b/apps/emqx_cluster_link/test/emqx_cluster_link_SUITE.erl index e38cd3999..e023aacab 100644 --- a/apps/emqx_cluster_link/test/emqx_cluster_link_SUITE.erl +++ b/apps/emqx_cluster_link/test/emqx_cluster_link_SUITE.erl @@ -55,7 +55,7 @@ mk_source_cluster(BaseName, Config) -> "\n name = cl.source" "\n links = [" "\n { enable = true" - "\n upstream = cl.target" + "\n name = cl.target" "\n server = \"localhost:31883\"" "\n clientid = client.source" "\n topics = []" @@ -77,7 +77,7 @@ mk_target_cluster(BaseName, Config) -> "\n name = cl.target" "\n links = [" "\n { enable = true" - "\n upstream = cl.source" + "\n name = cl.source" "\n server = \"localhost:41883\"" "\n clientid = client.target" "\n topics = [\"#\"]" diff --git a/apps/emqx_cluster_link/test/emqx_cluster_link_config_SUITE.erl b/apps/emqx_cluster_link/test/emqx_cluster_link_config_SUITE.erl index e09f12ce4..97e62402c 100644 --- a/apps/emqx_cluster_link/test/emqx_cluster_link_config_SUITE.erl +++ b/apps/emqx_cluster_link/test/emqx_cluster_link_config_SUITE.erl @@ -101,14 +101,14 @@ t_config_update(Config) -> <<"pool_size">> => 1, <<"server">> => <<"localhost:", (integer_to_binary(LPortB))/binary>>, <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>], - <<"upstream">> => NameB + <<"name">> => NameB }, LinkConfB = #{ <<"enable">> => true, <<"pool_size">> => 1, <<"server">> => <<"localhost:", (integer_to_binary(LPortA))/binary>>, <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>], - <<"upstream">> => NameA + <<"name">> => NameA }, {ok, SubRef} = snabbkaffe:subscribe( @@ -242,7 +242,7 @@ t_config_validations(Config) -> <<"pool_size">> => 1, <<"server">> => <<"localhost:", (integer_to_binary(LPortB))/binary>>, <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>], - <<"upstream">> => NameB + <<"name">> => NameB }, DuplicatedLinks = [LinkConfA, LinkConfA#{<<"enable">> => false, <<"pool_size">> => 2}], ?assertMatch( @@ -267,7 +267,7 @@ t_config_validations(Config) -> ?assertMatch( {error, #{reason := required_field}}, erpc:call(NodeA, emqx_cluster_link_config, update, [ - [maps:remove(<<"upstream">>, LinkConfA)] + [maps:remove(<<"name">>, LinkConfA)] ]) ), ?assertMatch( @@ -285,7 +285,7 @@ t_config_validations(Config) -> erpc:call(NodeA, emqx_cluster_link_config, update, [[LinkConfA]]) ), LinkConfUnknown = LinkConfA#{ - <<"upstream">> => <<"no-cluster">>, <<"server">> => <<"no-cluster.emqx:31883">> + <<"name">> => <<"no-cluster">>, <<"server">> => <<"no-cluster.emqx:31883">> }, ?assertMatch( {ok, _}, @@ -365,14 +365,14 @@ t_config_update_ds(Config) -> <<"pool_size">> => 1, <<"server">> => <<"localhost:", (integer_to_binary(LPortB))/binary>>, <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>], - <<"upstream">> => NameB + <<"name">> => NameB }, LinkConfB = #{ <<"enable">> => true, <<"pool_size">> => 1, <<"server">> => <<"localhost:", (integer_to_binary(LPortA))/binary>>, <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>], - <<"upstream">> => NameA + <<"name">> => NameA }, {ok, SubRef} = snabbkaffe:subscribe( @@ -500,14 +500,14 @@ t_misconfigured_links(Config) -> <<"pool_size">> => 1, <<"server">> => <<"localhost:", (integer_to_binary(LPortB))/binary>>, <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>], - <<"upstream">> => <<"bad-b-name">> + <<"name">> => <<"bad-b-name">> }, LinkConfB = #{ <<"enable">> => true, <<"pool_size">> => 1, <<"server">> => <<"localhost:", (integer_to_binary(LPortA))/binary>>, <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>], - <<"upstream">> => NameA + <<"name">> => NameA }, ?assertMatch({ok, _}, erpc:call(NodeB1, emqx_cluster_link_config, update, [[LinkConfB]])), @@ -528,7 +528,7 @@ t_misconfigured_links(Config) -> ), {{ok, _}, {ok, _}} = ?wait_async_action( - erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA#{<<"upstream">> => NameB}]]), + erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA#{<<"name">> => NameB}]]), #{ ?snk_kind := clink_route_bootstrap_complete, ?snk_meta := #{node := NodeA1} @@ -554,7 +554,7 @@ t_misconfigured_links(Config) -> LinkConfB#{<<"enable">> => false}, %% An extra dummy link to keep B hook/external_broker registered and be able to %% respond with "link disabled error" for the first disabled link - LinkConfB#{<<"upstream">> => <<"bad-a-name">>} + LinkConfB#{<<"name">> => <<"bad-a-name">>} ] ] ) @@ -562,7 +562,7 @@ t_misconfigured_links(Config) -> ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[]])), {{ok, _}, {ok, _}} = ?wait_async_action( - erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA#{<<"upstream">> => NameB}]]), + erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA#{<<"name">> => NameB}]]), #{ ?snk_kind := clink_handshake_error, reason := <<"cluster_link_disabled">>, @@ -579,13 +579,13 @@ t_misconfigured_links(Config) -> ?assertMatch( {ok, _}, erpc:call(NodeB1, emqx_cluster_link_config, update, [ - [LinkConfB#{<<"upstream">> => <<"bad-a-name">>}] + [LinkConfB#{<<"name">> => <<"bad-a-name">>}] ]) ), ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[]])), {{ok, _}, {ok, _}} = ?wait_async_action( - erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA#{<<"upstream">> => NameB}]]), + erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA#{<<"name">> => NameB}]]), #{ ?snk_kind := clink_handshake_error, reason := <<"unknown_cluster">>, diff --git a/rel/i18n/emqx_cluster_link_schema.hocon b/rel/i18n/emqx_cluster_link_schema.hocon index 77e4987f7..efd402569 100644 --- a/rel/i18n/emqx_cluster_link_schema.hocon +++ b/rel/i18n/emqx_cluster_link_schema.hocon @@ -12,9 +12,9 @@ enable.desc: """Enable or disable a cluster link. The link is enabled by default, disabling it allows stopping the link without removing its configuration. The link must be enabled on both sides to be operational. Disabling the link should also be done on both clusters in order to free up all associated resources.""" enable.label: "Enable" -upstream.desc: -"""Upstream cluster name. Must be exactly equal to the value of `cluster.name` configured at the remote cluster. Must not be equal to the local cluster.name. All configured cluster link upstream names must be unique.""" -upstream.label: "Upstream Name" +link_name.desc: +"""Linked (remote) cluster name. Must be exactly equal to the value of `cluster.name` configured at the remote cluster. Must not be equal to the local cluster.name. All configured cluster link names must be unique.""" +link_name.label: "Linked Cluster Name" server.desc: """MQTT host and port of the remote EMQX broker."""