diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link.erl b/apps/emqx_cluster_link/src/emqx_cluster_link.erl index 1d196aa91..7a64b0ff7 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link.erl @@ -170,10 +170,7 @@ actor_init( case MyClusterName of TargetCluster -> Env = #{timestamp => erlang:system_time(millisecond)}, - {ok, _} = emqx_cluster_link_extrouter:actor_init( - ClusterName, Actor, Incr, Env - ), - ok; + emqx_cluster_link_extrouter:actor_init(ClusterName, Actor, Incr, Env); _ -> %% The remote cluster uses a different name to refer to this cluster ?SLOG(error, #{ diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl index bbec844df..9b6cf07e2 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl @@ -19,7 +19,8 @@ actor_state/3, actor_apply_operation/2, actor_apply_operation/3, - actor_gc/1 + actor_gc/1, + is_present_incarnation/1 ]). %% Internal API @@ -140,7 +141,8 @@ match_to_route(M) -> cluster :: cluster(), actor :: actor(), incarnation :: incarnation(), - lane :: lane() | undefined + lane :: lane() | undefined, + extra :: map() }). -type state() :: #state{}. @@ -159,6 +161,12 @@ actor_init(Cluster, Actor, Incarnation, Env = #{timestamp := Now}) -> actor_init(Cluster, Actor, Incarnation, Env) end. +-spec is_present_incarnation(state()) -> boolean(). +is_present_incarnation(#state{extra = #{is_present_incarnation := IsNew}}) -> + IsNew; +is_present_incarnation(_State) -> + false. + mnesia_actor_init(Cluster, Actor, Incarnation, TS) -> %% NOTE %% We perform this heavy-weight transaction only in the case of a new route @@ -173,7 +181,7 @@ mnesia_actor_init(Cluster, Actor, Incarnation, TS) -> case mnesia:read(?EXTROUTE_ACTOR_TAB, ActorID, write) of [#actor{incarnation = Incarnation, lane = Lane} = Rec] -> ok = mnesia:write(?EXTROUTE_ACTOR_TAB, Rec#actor{until = bump_actor_ttl(TS)}, write), - {ok, State#state{lane = Lane}}; + {ok, State#state{lane = Lane, extra = #{is_present_incarnation => true}}}; [] -> Lane = mnesia_assign_lane(Cluster), Rec = #actor{ @@ -183,7 +191,7 @@ mnesia_actor_init(Cluster, Actor, Incarnation, TS) -> until = bump_actor_ttl(TS) }, ok = mnesia:write(?EXTROUTE_ACTOR_TAB, Rec, write), - {ok, State#state{lane = Lane}}; + {ok, State#state{lane = Lane, extra = #{is_present_incarnation => false}}}; [#actor{incarnation = Outdated} = Rec] when Incarnation > Outdated -> {reincarnate, Rec}; [#actor{incarnation = Newer}] -> @@ -321,7 +329,7 @@ mnesia_clean_incarnation(#actor{id = Actor, incarnation = Incarnation, lane = La clean_lane(Lane) -> ets:foldl( fun(#extroute{entry = Entry, mcounter = MCounter}, _) -> - apply_operation(Entry, MCounter, del, Lane) + apply_operation(Entry, MCounter, delete, Lane) end, 0, ?EXTROUTE_TAB diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl index 3ebfd6de6..db4e39224 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl @@ -71,6 +71,7 @@ -define(F_TARGET_CLUSTER, 13). -define(F_PROTO_VER, 14). -define(F_RESULT, 15). +-define(F_NEED_BOOTSTRAP, 16). -define(ROUTE_DELETE, 100). @@ -279,18 +280,29 @@ actor_init_ack_resp_msg(Actor, InitRes, ReqId, RespTopic) -> Payload = #{ ?F_OPERATION => ?OP_ACTOR_INIT_ACK, ?F_PROTO_VER => ?PROTO_VER, - ?F_ACTOR => Actor, - ?F_RESULT => InitRes + ?F_ACTOR => Actor }, + Payload1 = with_res_and_bootstrap(Payload, InitRes), emqx_message:make( undefined, ?QOS_1, RespTopic, - ?ENCODE(Payload), + ?ENCODE(Payload1), #{}, #{properties => #{'Correlation-Data' => ReqId}} ). +with_res_and_bootstrap(Payload, {ok, ActorState}) -> + Payload#{ + ?F_RESULT => ok, + ?F_NEED_BOOTSTRAP => not emqx_cluster_link_extrouter:is_present_incarnation(ActorState) + }; +with_res_and_bootstrap(Payload, Error) -> + Payload#{ + ?F_RESULT => Error, + ?F_NEED_BOOTSTRAP => false + }. + publish_route_sync(ClientPid, Actor, Incarnation, Updates) -> PubTopic = ?ROUTE_TOPIC, Payload = #{ @@ -339,9 +351,12 @@ decode_resp1(#{ ?F_OPERATION := ?OP_ACTOR_INIT_ACK, ?F_ACTOR := Actor, ?F_PROTO_VER := ProtoVer, - ?F_RESULT := InitResult + ?F_RESULT := InitResult, + ?F_NEED_BOOTSTRAP := NeedBootstrap }) -> - {actor_init_ack, #{actor => Actor, result => InitResult, proto_ver => ProtoVer}}. + {actor_init_ack, #{ + actor => Actor, result => InitResult, proto_ver => ProtoVer, need_bootstrap => NeedBootstrap + }}. decode_forwarded_msg(Payload) -> case ?DECODE(Payload) of diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl index 7d0a9db25..0571ba099 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl @@ -164,14 +164,6 @@ refine_client_options(Options = #{clientid := ClientID}, Actor) -> retry_interval => 0 }. -client_session_present(ClientPid) -> - Info = emqtt:info(ClientPid), - %% FIXME: waitnig for emqtt release that fixes session_present type (must be a boolean) - case proplists:get_value(session_present, Info, 0) of - 0 -> false; - 1 -> true - end. - announce_client(Actor, TargetCluster, Pid) -> Name = case Actor of @@ -334,13 +326,15 @@ handle_info( {publish, #{payload := Payload, properties := #{'Correlation-Data' := ReqId}}}, St = #st{actor_init_req_id = ReqId} ) -> - {actor_init_ack, #{result := Res} = AckInfoMap} = emqx_cluster_link_mqtt:decode_resp(Payload), + {actor_init_ack, #{result := Res, need_bootstrap := NeedBootstrap} = AckInfoMap} = emqx_cluster_link_mqtt:decode_resp( + Payload + ), St1 = St#st{ actor_init_req_id = undefined, actor_init_timer = undefined, remote_actor_info = AckInfoMap }, case Res of ok -> - {noreply, post_actor_init(St1)}; + {noreply, post_actor_init(St1, NeedBootstrap)}; Error -> ?SLOG(error, #{ msg => "failed_to_init_link", @@ -410,10 +404,11 @@ init_remote_actor( St#st{actor_init_req_id = ReqId, actor_init_timer = TRef}. post_actor_init( - St = #st{client = ClientPid, target = TargetCluster, actor = Actor, incarnation = Incr} + St = #st{client = ClientPid, target = TargetCluster, actor = Actor, incarnation = Incr}, + NeedBootstrap ) -> ok = start_syncer(TargetCluster, Actor, Incr), - process_bootstrap(St#st{client = ClientPid}). + process_bootstrap(St#st{client = ClientPid}, NeedBootstrap). handle_connect_error(_Reason, St) -> %% TODO: logs @@ -426,14 +421,14 @@ handle_client_down(_Reason, St = #st{target = TargetCluster, actor = Actor}) -> ok = close_syncer(TargetCluster, Actor), process_connect(St#st{client = undefined}). -process_bootstrap(St = #st{bootstrapped = false}) -> +process_bootstrap(St = #st{bootstrapped = false}, _NeedBootstrap) -> run_bootstrap(St); -process_bootstrap(St = #st{client = ClientPid, bootstrapped = true}) -> - case client_session_present(ClientPid) of +process_bootstrap(St = #st{bootstrapped = true}, NeedBootstrap) -> + case NeedBootstrap of true -> - process_bootstrapped(St); + run_bootstrap(St); false -> - run_bootstrap(St) + process_bootstrapped(St) end. %% Bootstrapping.