feat(clusterlink): update only necessary resources when a link config is changed

This commit is contained in:
Serge Tupchii 2024-06-07 19:17:01 +03:00
parent 5304ca1563
commit d282c61120
1 changed files with 40 additions and 9 deletions

View File

@ -19,6 +19,13 @@
-define(DEFAULT_ACTOR_TTL, 3_000). -define(DEFAULT_ACTOR_TTL, 3_000).
-endif. -endif.
-define(COMMON_FIELDS, [username, password, clientid, server, ssl]).
%% NOTE: retry_interval, max_inflight may be used for router syncer client as well,
%% but for now they are not.
-define(MSG_RES_FIELDS, [resource_opts, pool_size, retry_interval, max_inflight]).
%% Excludes a special hidden `ps_actor_incarnation` field.
-define(ACTOR_FIELDS, [topics]).
-export([ -export([
%% General %% General
update/1, update/1,
@ -196,19 +203,43 @@ update_links(LinksConf) ->
[update_link(Link) || Link <- LinksConf]. [update_link(Link) || Link <- LinksConf].
update_link({OldLinkConf, #{enable := true, upstream := Name} = NewLinkConf}) -> update_link({OldLinkConf, #{enable := true, upstream := Name} = NewLinkConf}) ->
case what_is_changed(OldLinkConf, NewLinkConf) of
both ->
_ = ensure_actor_stopped(Name),
{ok, _Pid} = emqx_cluster_link_sup:ensure_actor(NewLinkConf),
ok = update_msg_fwd_resource(OldLinkConf, NewLinkConf);
actor ->
_ = ensure_actor_stopped(Name), _ = ensure_actor_stopped(Name),
{ok, _Pid} = emqx_cluster_link_sup:ensure_actor(NewLinkConf), {ok, _Pid} = emqx_cluster_link_sup:ensure_actor(NewLinkConf),
%% TODO: if only msg_fwd resource related config is changed,
%% we can skip actor reincarnation/restart.
ok = update_msg_fwd_resource(OldLinkConf, NewLinkConf),
ok; ok;
msg_resource ->
ok = update_msg_fwd_resource(OldLinkConf, NewLinkConf)
end;
update_link({_OldLinkConf, #{enable := false, upstream := Name} = _NewLinkConf}) -> update_link({_OldLinkConf, #{enable := false, upstream := Name} = _NewLinkConf}) ->
_ = emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name), _ = emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name),
ensure_actor_stopped(Name). ensure_actor_stopped(Name).
update_msg_fwd_resource(#{pool_size := Old}, #{pool_size := Old} = NewConf) -> what_is_changed(OldLink, NewLink) ->
{ok, _} = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(NewConf), CommonChanged = are_fields_changed(?COMMON_FIELDS, OldLink, NewLink),
ok; ActorChanged = are_fields_changed(?ACTOR_FIELDS, OldLink, NewLink),
MsgResChanged = are_fields_changed(?MSG_RES_FIELDS, OldLink, NewLink),
AllChanged = ActorChanged andalso MsgResChanged,
case CommonChanged orelse AllChanged of
true ->
both;
false ->
%% This function is only applicable when it's certain that link conf is changed,
%% so if resource fields are the same,
%% then some other actor-related fields are definitely changed.
case MsgResChanged of
true -> msg_resource;
false -> actor
end
end.
are_fields_changed(Fields, OldLink, NewLink) ->
maps:with(Fields, OldLink) =/= maps:with(Fields, NewLink).
update_msg_fwd_resource(_, #{upstream := Name} = NewConf) -> update_msg_fwd_resource(_, #{upstream := Name} = NewConf) ->
_ = emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name), _ = emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name),
{ok, _} = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(NewConf), {ok, _} = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(NewConf),