From 21711c6e0d0f0e68d9e6730c8452a6b3637fd080 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Tue, 28 May 2024 17:55:36 +0300 Subject: [PATCH] fix(clusterlink): communicate bootstrap requirements via actor handshake `session_present` flag is not reliable to decide whether bootstrap is needed if durable sessions is enabled. In this case, the client session may survive cluster restart while all the external routes are lost, as they are not persistent. --- .../src/emqx_cluster_link.erl | 5 +--- .../src/emqx_cluster_link_extrouter.erl | 18 ++++++++---- .../src/emqx_cluster_link_mqtt.erl | 25 ++++++++++++---- .../src/emqx_cluster_link_router_syncer.erl | 29 ++++++++----------- 4 files changed, 46 insertions(+), 31 deletions(-) diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link.erl b/apps/emqx_cluster_link/src/emqx_cluster_link.erl index 1d196aa91..7a64b0ff7 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link.erl @@ -170,10 +170,7 @@ actor_init( case MyClusterName of TargetCluster -> Env = #{timestamp => erlang:system_time(millisecond)}, - {ok, _} = emqx_cluster_link_extrouter:actor_init( - ClusterName, Actor, Incr, Env - ), - ok; + emqx_cluster_link_extrouter:actor_init(ClusterName, Actor, Incr, Env); _ -> %% The remote cluster uses a different name to refer to this cluster ?SLOG(error, #{ diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl index bbec844df..9b6cf07e2 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl @@ -19,7 +19,8 @@ actor_state/3, actor_apply_operation/2, actor_apply_operation/3, - actor_gc/1 + actor_gc/1, + is_present_incarnation/1 ]). %% Internal API @@ -140,7 +141,8 @@ match_to_route(M) -> cluster :: cluster(), actor :: actor(), incarnation :: incarnation(), - lane :: lane() | undefined + lane :: lane() | undefined, + extra :: map() }). -type state() :: #state{}. @@ -159,6 +161,12 @@ actor_init(Cluster, Actor, Incarnation, Env = #{timestamp := Now}) -> actor_init(Cluster, Actor, Incarnation, Env) end. +-spec is_present_incarnation(state()) -> boolean(). +is_present_incarnation(#state{extra = #{is_present_incarnation := IsNew}}) -> + IsNew; +is_present_incarnation(_State) -> + false. + mnesia_actor_init(Cluster, Actor, Incarnation, TS) -> %% NOTE %% We perform this heavy-weight transaction only in the case of a new route @@ -173,7 +181,7 @@ mnesia_actor_init(Cluster, Actor, Incarnation, TS) -> case mnesia:read(?EXTROUTE_ACTOR_TAB, ActorID, write) of [#actor{incarnation = Incarnation, lane = Lane} = Rec] -> ok = mnesia:write(?EXTROUTE_ACTOR_TAB, Rec#actor{until = bump_actor_ttl(TS)}, write), - {ok, State#state{lane = Lane}}; + {ok, State#state{lane = Lane, extra = #{is_present_incarnation => true}}}; [] -> Lane = mnesia_assign_lane(Cluster), Rec = #actor{ @@ -183,7 +191,7 @@ mnesia_actor_init(Cluster, Actor, Incarnation, TS) -> until = bump_actor_ttl(TS) }, ok = mnesia:write(?EXTROUTE_ACTOR_TAB, Rec, write), - {ok, State#state{lane = Lane}}; + {ok, State#state{lane = Lane, extra = #{is_present_incarnation => false}}}; [#actor{incarnation = Outdated} = Rec] when Incarnation > Outdated -> {reincarnate, Rec}; [#actor{incarnation = Newer}] -> @@ -321,7 +329,7 @@ mnesia_clean_incarnation(#actor{id = Actor, incarnation = Incarnation, lane = La clean_lane(Lane) -> ets:foldl( fun(#extroute{entry = Entry, mcounter = MCounter}, _) -> - apply_operation(Entry, MCounter, del, Lane) + apply_operation(Entry, MCounter, delete, Lane) end, 0, ?EXTROUTE_TAB diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl index 3ebfd6de6..db4e39224 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl @@ -71,6 +71,7 @@ -define(F_TARGET_CLUSTER, 13). -define(F_PROTO_VER, 14). -define(F_RESULT, 15). +-define(F_NEED_BOOTSTRAP, 16). -define(ROUTE_DELETE, 100). @@ -279,18 +280,29 @@ actor_init_ack_resp_msg(Actor, InitRes, ReqId, RespTopic) -> Payload = #{ ?F_OPERATION => ?OP_ACTOR_INIT_ACK, ?F_PROTO_VER => ?PROTO_VER, - ?F_ACTOR => Actor, - ?F_RESULT => InitRes + ?F_ACTOR => Actor }, + Payload1 = with_res_and_bootstrap(Payload, InitRes), emqx_message:make( undefined, ?QOS_1, RespTopic, - ?ENCODE(Payload), + ?ENCODE(Payload1), #{}, #{properties => #{'Correlation-Data' => ReqId}} ). +with_res_and_bootstrap(Payload, {ok, ActorState}) -> + Payload#{ + ?F_RESULT => ok, + ?F_NEED_BOOTSTRAP => not emqx_cluster_link_extrouter:is_present_incarnation(ActorState) + }; +with_res_and_bootstrap(Payload, Error) -> + Payload#{ + ?F_RESULT => Error, + ?F_NEED_BOOTSTRAP => false + }. + publish_route_sync(ClientPid, Actor, Incarnation, Updates) -> PubTopic = ?ROUTE_TOPIC, Payload = #{ @@ -339,9 +351,12 @@ decode_resp1(#{ ?F_OPERATION := ?OP_ACTOR_INIT_ACK, ?F_ACTOR := Actor, ?F_PROTO_VER := ProtoVer, - ?F_RESULT := InitResult + ?F_RESULT := InitResult, + ?F_NEED_BOOTSTRAP := NeedBootstrap }) -> - {actor_init_ack, #{actor => Actor, result => InitResult, proto_ver => ProtoVer}}. + {actor_init_ack, #{ + actor => Actor, result => InitResult, proto_ver => ProtoVer, need_bootstrap => NeedBootstrap + }}. decode_forwarded_msg(Payload) -> case ?DECODE(Payload) of diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl index 7d0a9db25..0571ba099 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl @@ -164,14 +164,6 @@ refine_client_options(Options = #{clientid := ClientID}, Actor) -> retry_interval => 0 }. -client_session_present(ClientPid) -> - Info = emqtt:info(ClientPid), - %% FIXME: waitnig for emqtt release that fixes session_present type (must be a boolean) - case proplists:get_value(session_present, Info, 0) of - 0 -> false; - 1 -> true - end. - announce_client(Actor, TargetCluster, Pid) -> Name = case Actor of @@ -334,13 +326,15 @@ handle_info( {publish, #{payload := Payload, properties := #{'Correlation-Data' := ReqId}}}, St = #st{actor_init_req_id = ReqId} ) -> - {actor_init_ack, #{result := Res} = AckInfoMap} = emqx_cluster_link_mqtt:decode_resp(Payload), + {actor_init_ack, #{result := Res, need_bootstrap := NeedBootstrap} = AckInfoMap} = emqx_cluster_link_mqtt:decode_resp( + Payload + ), St1 = St#st{ actor_init_req_id = undefined, actor_init_timer = undefined, remote_actor_info = AckInfoMap }, case Res of ok -> - {noreply, post_actor_init(St1)}; + {noreply, post_actor_init(St1, NeedBootstrap)}; Error -> ?SLOG(error, #{ msg => "failed_to_init_link", @@ -410,10 +404,11 @@ init_remote_actor( St#st{actor_init_req_id = ReqId, actor_init_timer = TRef}. post_actor_init( - St = #st{client = ClientPid, target = TargetCluster, actor = Actor, incarnation = Incr} + St = #st{client = ClientPid, target = TargetCluster, actor = Actor, incarnation = Incr}, + NeedBootstrap ) -> ok = start_syncer(TargetCluster, Actor, Incr), - process_bootstrap(St#st{client = ClientPid}). + process_bootstrap(St#st{client = ClientPid}, NeedBootstrap). handle_connect_error(_Reason, St) -> %% TODO: logs @@ -426,14 +421,14 @@ handle_client_down(_Reason, St = #st{target = TargetCluster, actor = Actor}) -> ok = close_syncer(TargetCluster, Actor), process_connect(St#st{client = undefined}). -process_bootstrap(St = #st{bootstrapped = false}) -> +process_bootstrap(St = #st{bootstrapped = false}, _NeedBootstrap) -> run_bootstrap(St); -process_bootstrap(St = #st{client = ClientPid, bootstrapped = true}) -> - case client_session_present(ClientPid) of +process_bootstrap(St = #st{bootstrapped = true}, NeedBootstrap) -> + case NeedBootstrap of true -> - process_bootstrapped(St); + run_bootstrap(St); false -> - run_bootstrap(St) + process_bootstrapped(St) end. %% Bootstrapping.