diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl index 0571ba099..3c87b9e37 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl @@ -296,7 +296,9 @@ syncer_spec(ChildID, Actor, Incarnation, SyncerRef, ClientName) -> reconnect_timer :: reference(), actor_init_req_id :: binary(), actor_init_timer :: reference(), - remote_actor_info :: undefined | map() + remote_actor_info :: undefined | map(), + status :: connecting | connected | disconnected, + error :: undefined | term() }). mk_state(TargetCluster, Actor, Incarnation) -> @@ -304,7 +306,8 @@ mk_state(TargetCluster, Actor, Incarnation) -> target = TargetCluster, actor = Actor, incarnation = Incarnation, - bootstrapped = false + bootstrapped = false, + status = connecting }. init_actor(State = #st{}) -> @@ -334,18 +337,20 @@ handle_info( }, case Res of ok -> - {noreply, post_actor_init(St1, NeedBootstrap)}; + _ = maybe_deactivate_alarm(St), + {noreply, + post_actor_init(St1#st{error = undefined, status = connected}, NeedBootstrap)}; Error -> + Reason = error_reason(Error), ?SLOG(error, #{ msg => "failed_to_init_link", - reason => Error, + reason => Reason, target_cluster => St#st.target, actor => St#st.actor, remote_link_proto_ver => maps:get(proto_ver, AckInfoMap, undefined) }), - %% TODO: It doesn't fit permanent workers/one_for_all restart/strategy. - %% The actor may be kept alive with some error status instead (waiting for a user intervention to fix it)? - {stop, {shutdown, Error}, St1} + _ = maybe_alarm(Reason, St1), + {noreply, St1#st{error = Reason, status = disconnected}} end; handle_info({publish, #{}}, St) -> {noreply, St}; @@ -357,7 +362,10 @@ handle_info({timeout, TRef, actor_reinit}, St = #st{actor_init_timer = TRef}) -> target_cluster => St#st.target, actor => St#st.actor }), - {noreply, init_remote_actor(St#st{reconnect_timer = undefined})}; + Reason = init_timeout, + _ = maybe_alarm(Reason, St), + {noreply, + init_remote_actor(St#st{reconnect_timer = undefined, status = disconnected, error = Reason})}; %% Stale timeout. handle_info({timeout, _, _}, St) -> {noreply, St}; @@ -371,6 +379,7 @@ terminate(_Reason, _State) -> process_connect(St = #st{target = TargetCluster, actor = Actor}) -> case start_link_client(TargetCluster, Actor) of {ok, ClientPid} -> + _ = maybe_deactivate_alarm(St), ok = announce_client(Actor, TargetCluster, ClientPid), %% TODO: handle subscribe errors {ok, _, _} = emqtt:subscribe(ClientPid, ?RESP_TOPIC(Actor), ?QOS_1), @@ -389,19 +398,22 @@ init_remote_actor( ), ClientPid ), - case Res of - ok -> - ok; - {error, Reason} -> - ?SLOG(error, #{ - msg => "failed_to_init_remote_actor", - reason => Reason, - target_cluster => TargetCluster, - actor => Actor - }) - end, + St1 = + case Res of + ok -> + St#st{status = connecting}; + {error, Reason} -> + ?SLOG(error, #{ + msg => "cluster_link_init_failed", + reason => Reason, + target_cluster => TargetCluster, + actor => Actor + }), + _ = maybe_alarm(Reason, St), + St#st{error = Reason, status = disconnected} + end, TRef = erlang:start_timer(?ACTOR_REINIT_TIMEOUT, self(), actor_reinit), - St#st{actor_init_req_id = ReqId, actor_init_timer = TRef}. + St1#st{actor_init_req_id = ReqId, actor_init_timer = TRef}. post_actor_init( St = #st{client = ClientPid, target = TargetCluster, actor = Actor, incarnation = Incr}, @@ -410,16 +422,28 @@ post_actor_init( ok = start_syncer(TargetCluster, Actor, Incr), process_bootstrap(St#st{client = ClientPid}, NeedBootstrap). -handle_connect_error(_Reason, St) -> - %% TODO: logs +handle_connect_error(Reason, St) -> + ?SLOG(error, #{ + msg => "cluster_link_connection_failed", + reason => Reason, + target_cluster => St#st.target, + actor => St#st.actor + }), TRef = erlang:start_timer(?RECONNECT_TIMEOUT, self(), reconnect), - St#st{reconnect_timer = TRef}. + _ = maybe_alarm(Reason, St), + St#st{reconnect_timer = TRef, error = Reason, status = disconnected}. -handle_client_down(_Reason, St = #st{target = TargetCluster, actor = Actor}) -> - %% TODO: logs +handle_client_down(Reason, St = #st{target = TargetCluster, actor = Actor}) -> + ?SLOG(error, #{ + msg => "cluster_link_connection_failed", + reason => Reason, + target_cluster => St#st.target, + actor => St#st.actor + }), %% TODO: syncer may be already down due to one_for_all strategy ok = close_syncer(TargetCluster, Actor), - process_connect(St#st{client = undefined}). + _ = maybe_alarm(Reason, St), + process_connect(St#st{client = undefined, error = Reason, status = connecting}). process_bootstrap(St = #st{bootstrapped = false}, _NeedBootstrap) -> run_bootstrap(St); @@ -471,3 +495,30 @@ process_bootstrapped(St = #st{target = TargetCluster, actor = Actor}) -> process_bootstrap_batch(Batch, #st{client = ClientPid, actor = Actor, incarnation = Incarnation}) -> publish_routes(ClientPid, Actor, Incarnation, Batch). + +error_reason({error, Reason}) -> + Reason; +error_reason(OtherErr) -> + OtherErr. + +%% Assume that alarm is already active +maybe_alarm(Error, #st{error = Error}) -> + ok; +maybe_alarm(Error, St) -> + HrError = emqx_utils:readable_error_msg(error_reason(Error)), + Name = link_name(St), + emqx_alarm:safe_activate( + Name, + #{custer_link => Name, reason => cluster_link_down}, + <<"cluster link down: ", HrError/binary>> + ). + +maybe_deactivate_alarm(#st{error = undefined}) -> + ok; +maybe_deactivate_alarm(St) -> + emqx_alarm:safe_deactivate(link_name(St)). + +link_name(#st{actor = ?PS_ACTOR = Actor, target = Target}) -> + <<"cluster_link:", Target/binary, ":", (get_actor_id())/binary, ":", Actor/binary>>; +link_name(#st{actor = Actor, target = Target}) -> + <<"cluster_link:", Target/binary, ":", Actor/binary>>.