fix(cluster-link): ensure replication actor bootstraps do heartbeats

This commit is contained in:
Andrew Mayorov 2024-05-20 11:44:20 +02:00 committed by Serge Tupchii
parent d4b449c6e1
commit 5771a41a32
1 changed files with 15 additions and 4 deletions

View File

@ -389,8 +389,6 @@ process_connect(St = #st{target = TargetCluster, actor = Actor}) ->
{ok, ClientPid} -> {ok, ClientPid} ->
_ = maybe_deactivate_alarm(St), _ = maybe_deactivate_alarm(St),
ok = announce_client(Actor, TargetCluster, ClientPid), ok = announce_client(Actor, TargetCluster, ClientPid),
%% TODO: handle subscribe errors
{ok, _, _} = emqtt:subscribe(ClientPid, ?RESP_TOPIC(Actor), ?QOS_1),
init_remote_actor(St#st{client = ClientPid}); init_remote_actor(St#st{client = ClientPid});
{error, Reason} -> {error, Reason} ->
handle_connect_error(Reason, St) handle_connect_error(Reason, St)
@ -400,6 +398,8 @@ init_remote_actor(
St = #st{target = TargetCluster, client = ClientPid, actor = Actor, incarnation = Incr} St = #st{target = TargetCluster, client = ClientPid, actor = Actor, incarnation = Incr}
) -> ) ->
ReqId = emqx_utils_conv:bin(emqx_utils:gen_id(16)), ReqId = emqx_utils_conv:bin(emqx_utils:gen_id(16)),
%% TODO: handle subscribe errors
{ok, _, _} = emqtt:subscribe(ClientPid, ?RESP_TOPIC(Actor), ?QOS_1),
Res = ?SAFE_MQTT_PUB( Res = ?SAFE_MQTT_PUB(
emqx_cluster_link_mqtt:publish_actor_init_sync( emqx_cluster_link_mqtt:publish_actor_init_sync(
ClientPid, ReqId, ?RESP_TOPIC(Actor), TargetCluster, Actor, Incr ClientPid, ReqId, ?RESP_TOPIC(Actor), TargetCluster, Actor, Incr
@ -428,7 +428,6 @@ post_actor_init(
NeedBootstrap NeedBootstrap
) -> ) ->
ok = start_syncer(TargetCluster, Actor, Incr), ok = start_syncer(TargetCluster, Actor, Incr),
%% TODO: Heartbeats are currently blocked by bootstrapping.
NSt = schedule_heartbeat(St#st{client = ClientPid}), NSt = schedule_heartbeat(St#st{client = ClientPid}),
process_bootstrap(NSt, NeedBootstrap). process_bootstrap(NSt, NeedBootstrap).
@ -500,7 +499,8 @@ run_bootstrap(Bootstrap, St) ->
%% TODO: Better error handling. %% TODO: Better error handling.
case process_bootstrap_batch(Batch, St) of case process_bootstrap_batch(Batch, St) of
#{} -> #{} ->
run_bootstrap(NBootstrap, St); NSt = ensure_bootstrap_heartbeat(St),
run_bootstrap(NBootstrap, NSt);
{error, {client, _, _}} -> {error, {client, _, _}} ->
%% Client has exited, let `reconnect` codepath handle it. %% Client has exited, let `reconnect` codepath handle it.
St St
@ -514,6 +514,17 @@ process_bootstrapped(St = #st{target = TargetCluster, actor = Actor}) ->
process_bootstrap_batch(Batch, #st{client = ClientPid, actor = Actor, incarnation = Incarnation}) -> process_bootstrap_batch(Batch, #st{client = ClientPid, actor = Actor, incarnation = Incarnation}) ->
publish_routes(ClientPid, Actor, Incarnation, Batch). publish_routes(ClientPid, Actor, Incarnation, Batch).
ensure_bootstrap_heartbeat(St = #st{heartbeat_timer = TRef}) ->
case erlang:read_timer(TRef) of
false ->
ok = emqx_utils:cancel_timer(TRef),
process_heartbeat(St);
_TimeLeft ->
St
end.
%%
error_reason({error, Reason}) -> error_reason({error, Reason}) ->
Reason; Reason;
error_reason(OtherErr) -> error_reason(OtherErr) ->