diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl index dc3903f72..9d35bb812 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl @@ -389,8 +389,6 @@ process_connect(St = #st{target = TargetCluster, actor = Actor}) -> {ok, ClientPid} -> _ = maybe_deactivate_alarm(St), ok = announce_client(Actor, TargetCluster, ClientPid), - %% TODO: handle subscribe errors - {ok, _, _} = emqtt:subscribe(ClientPid, ?RESP_TOPIC(Actor), ?QOS_1), init_remote_actor(St#st{client = ClientPid}); {error, Reason} -> handle_connect_error(Reason, St) @@ -400,6 +398,8 @@ init_remote_actor( St = #st{target = TargetCluster, client = ClientPid, actor = Actor, incarnation = Incr} ) -> ReqId = emqx_utils_conv:bin(emqx_utils:gen_id(16)), + %% TODO: handle subscribe errors + {ok, _, _} = emqtt:subscribe(ClientPid, ?RESP_TOPIC(Actor), ?QOS_1), Res = ?SAFE_MQTT_PUB( emqx_cluster_link_mqtt:publish_actor_init_sync( ClientPid, ReqId, ?RESP_TOPIC(Actor), TargetCluster, Actor, Incr @@ -428,7 +428,6 @@ post_actor_init( NeedBootstrap ) -> ok = start_syncer(TargetCluster, Actor, Incr), - %% TODO: Heartbeats are currently blocked by bootstrapping. NSt = schedule_heartbeat(St#st{client = ClientPid}), process_bootstrap(NSt, NeedBootstrap). @@ -500,7 +499,8 @@ run_bootstrap(Bootstrap, St) -> %% TODO: Better error handling. case process_bootstrap_batch(Batch, St) of #{} -> - run_bootstrap(NBootstrap, St); + NSt = ensure_bootstrap_heartbeat(St), + run_bootstrap(NBootstrap, NSt); {error, {client, _, _}} -> %% Client has exited, let `reconnect` codepath handle it. St @@ -514,6 +514,17 @@ process_bootstrapped(St = #st{target = TargetCluster, actor = Actor}) -> process_bootstrap_batch(Batch, #st{client = ClientPid, actor = Actor, incarnation = Incarnation}) -> publish_routes(ClientPid, Actor, Incarnation, Batch). +ensure_bootstrap_heartbeat(St = #st{heartbeat_timer = TRef}) -> + case erlang:read_timer(TRef) of + false -> + ok = emqx_utils:cancel_timer(TRef), + process_heartbeat(St); + _TimeLeft -> + St + end. + +%% + error_reason({error, Reason}) -> Reason; error_reason(OtherErr) ->