From d282c61120dd1bc9fd019713c3962c7be2aea667 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Fri, 7 Jun 2024 19:17:01 +0300 Subject: [PATCH] feat(clusterlink): update only necessary resources when a link config is changed --- .../src/emqx_cluster_link_config.erl | 49 +++++++++++++++---- 1 file changed, 40 insertions(+), 9 deletions(-) diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl index 28344cd7e..c28755ac1 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl @@ -19,6 +19,13 @@ -define(DEFAULT_ACTOR_TTL, 3_000). -endif. +-define(COMMON_FIELDS, [username, password, clientid, server, ssl]). +%% NOTE: retry_interval, max_inflight may be used for router syncer client as well, +%% but for now they are not. +-define(MSG_RES_FIELDS, [resource_opts, pool_size, retry_interval, max_inflight]). +%% Excludes a special hidden `ps_actor_incarnation` field. +-define(ACTOR_FIELDS, [topics]). + -export([ %% General update/1, @@ -196,19 +203,43 @@ update_links(LinksConf) -> [update_link(Link) || Link <- LinksConf]. update_link({OldLinkConf, #{enable := true, upstream := Name} = NewLinkConf}) -> - _ = ensure_actor_stopped(Name), - {ok, _Pid} = emqx_cluster_link_sup:ensure_actor(NewLinkConf), - %% TODO: if only msg_fwd resource related config is changed, - %% we can skip actor reincarnation/restart. - ok = update_msg_fwd_resource(OldLinkConf, NewLinkConf), - ok; + case what_is_changed(OldLinkConf, NewLinkConf) of + both -> + _ = ensure_actor_stopped(Name), + {ok, _Pid} = emqx_cluster_link_sup:ensure_actor(NewLinkConf), + ok = update_msg_fwd_resource(OldLinkConf, NewLinkConf); + actor -> + _ = ensure_actor_stopped(Name), + {ok, _Pid} = emqx_cluster_link_sup:ensure_actor(NewLinkConf), + ok; + msg_resource -> + ok = update_msg_fwd_resource(OldLinkConf, NewLinkConf) + end; update_link({_OldLinkConf, #{enable := false, upstream := Name} = _NewLinkConf}) -> _ = emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name), ensure_actor_stopped(Name). -update_msg_fwd_resource(#{pool_size := Old}, #{pool_size := Old} = NewConf) -> - {ok, _} = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(NewConf), - ok; +what_is_changed(OldLink, NewLink) -> + CommonChanged = are_fields_changed(?COMMON_FIELDS, OldLink, NewLink), + ActorChanged = are_fields_changed(?ACTOR_FIELDS, OldLink, NewLink), + MsgResChanged = are_fields_changed(?MSG_RES_FIELDS, OldLink, NewLink), + AllChanged = ActorChanged andalso MsgResChanged, + case CommonChanged orelse AllChanged of + true -> + both; + false -> + %% This function is only applicable when it's certain that link conf is changed, + %% so if resource fields are the same, + %% then some other actor-related fields are definitely changed. + case MsgResChanged of + true -> msg_resource; + false -> actor + end + end. + +are_fields_changed(Fields, OldLink, NewLink) -> + maps:with(Fields, OldLink) =/= maps:with(Fields, NewLink). + update_msg_fwd_resource(_, #{upstream := Name} = NewConf) -> _ = emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name), {ok, _} = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(NewConf),