feat(clusterlink): replicate shared subscription and persistent session routes

This commit is contained in:
Serge Tupchii 2024-05-21 20:07:37 +03:00
parent f036b641eb
commit e7305c62ee
11 changed files with 246 additions and 63 deletions

View File

@ -24,6 +24,12 @@
-callback maybe_add_route(emqx_types:topic()) -> ok.
-callback maybe_delete_route(emqx_types:topic()) -> ok.
-callback maybe_add_shared_route(emqx_types:topic(), emqx_types:group()) -> ok.
-callback maybe_delete_shared_route(emqx_types:topic(), emqx_types:group()) -> ok.
-callback maybe_add_persistent_route(emqx_types:topic(), emqx_persistent_session_ds:id()) -> ok.
-callback maybe_delete_persistent_route(emqx_types:topic(), emqx_persistent_session_ds:id()) -> ok.
-callback match_routes(emqx_types:topic()) -> [emqx_types:route()].
-type dest() :: term().
@ -36,6 +42,10 @@
should_route_to_external_dests/1,
maybe_add_route/1,
maybe_delete_route/1,
maybe_add_shared_route/2,
maybe_delete_shared_route/2,
maybe_add_persistent_route/2,
maybe_delete_persistent_route/2,
match_routes/1
]).
@ -113,6 +123,18 @@ maybe_add_route(Topic) ->
maybe_delete_route(Topic) ->
?safe_with_provider(?FUNCTION_NAME(Topic), ok).
maybe_add_shared_route(Topic, Group) ->
?safe_with_provider(?FUNCTION_NAME(Topic, Group), ok).
maybe_delete_shared_route(Topic, Group) ->
?safe_with_provider(?FUNCTION_NAME(Topic, Group), ok).
maybe_add_persistent_route(Topic, ID) ->
?safe_with_provider(?FUNCTION_NAME(Topic, ID), ok).
maybe_delete_persistent_route(Topic, ID) ->
?safe_with_provider(?FUNCTION_NAME(Topic, ID), ok).
match_routes(Topic) ->
?safe_with_provider(?FUNCTION_NAME(Topic), ok).

View File

@ -92,6 +92,7 @@ on_subscribe(TopicFilter, SubOpts, #{id := SessionId, s := S0, props := Props})
case emqx_persistent_session_ds_state:n_subscriptions(S0) < MaxSubscriptions of
true ->
ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, SessionId),
_ = emqx_external_broker:maybe_add_persistent_route(TopicFilter, SessionId),
{SubId, S1} = emqx_persistent_session_ds_state:new_id(S0),
{SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1),
SState = #{
@ -154,6 +155,7 @@ on_unsubscribe(SessionId, TopicFilter, S0) ->
#{session_id => SessionId, topic_filter => TopicFilter},
ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, SessionId)
),
_ = emqx_external_broker:maybe_delete_persistent_route(TopicFilter, SessionId),
{ok, emqx_persistent_session_ds_state:del_subscription(TopicFilter, S0), Subscription}
end.

View File

@ -91,7 +91,7 @@
deinit_schema/0
]).
-export_type([dest/0, external_dest/0]).
-export_type([dest/0]).
-export_type([schemavsn/0]).
-type group() :: binary().
@ -313,7 +313,7 @@ print_routes(Topic) ->
match_routes(Topic)
).
-spec cleanup_routes(node() | external_dest()) -> ok.
-spec cleanup_routes(node()) -> ok.
cleanup_routes(NodeOrExtDest) ->
cleanup_routes(get_schema_vsn(), NodeOrExtDest).

View File

@ -421,8 +421,12 @@ init_monitors() ->
handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon}) ->
mria:dirty_write(?SHARED_SUBSCRIPTION, record(Group, Topic, SubPid)),
case ets:member(?SHARED_SUBSCRIBER, {Group, Topic}) of
true -> ok;
false -> ok = emqx_router:do_add_route(Topic, {Group, node()})
true ->
ok;
false ->
ok = emqx_router:do_add_route(Topic, {Group, node()}),
_ = emqx_external_broker:maybe_add_shared_route(Topic, Group),
ok
end,
ok = maybe_insert_alive_tab(SubPid),
ok = maybe_insert_round_robin_count({Group, Topic}),
@ -545,7 +549,9 @@ is_alive_sub(Pid) ->
delete_route_if_needed({Group, Topic} = GroupTopic) ->
if_no_more_subscribers(GroupTopic, fun() ->
ok = emqx_router:do_delete_route(Topic, {Group, node()})
ok = emqx_router:do_delete_route(Topic, {Group, node()}),
_ = emqx_external_broker:maybe_delete_shared_route(Topic, Group),
ok
end).
get_default_shared_subscription_strategy() ->

View File

@ -8,3 +8,7 @@
-define(MSG_TOPIC_PREFIX, ?TOPIC_PREFIX "msg/").
-define(DEST(FromClusterName), {external, {link, FromClusterName}}).
%% Fairly compact text encoding.
-define(SHARED_ROUTE_ID(Topic, Group), <<"$s/", Group/binary, "/", Topic/binary>>).
-define(PERSISTENT_ROUTE_ID(Topic, ID), <<"$p/", ID/binary, "/", Topic/binary>>).

View File

@ -0,0 +1,8 @@
%% -*- mode: erlang; -*-
{erl_opts, [debug_info]}.
{deps, [
{emqx, {path, "../../apps/emqx"}},
{emqx_resource, {path, "../../apps/emqx_resource"}}
]}.

View File

@ -11,6 +11,10 @@
unregister_external_broker/0,
maybe_add_route/1,
maybe_delete_route/1,
maybe_add_shared_route/2,
maybe_delete_shared_route/2,
maybe_add_persistent_route/2,
maybe_delete_persistent_route/2,
match_routes/1,
forward/2,
should_route_to_external_dests/1
@ -38,11 +42,29 @@ register_external_broker() ->
unregister_external_broker() ->
emqx_external_broker:unregister_provider(?MODULE).
%% Using original Topic as Route ID in the most common scenario:
%% (non-shared, non-persistent routes).
%% Original Topic is used to identify the route and be able
%% to delete it on a remote cluster.
%% There is no need to push Node name as this info can be derived from
%% agent state on the remote cluster.
maybe_add_route(Topic) ->
maybe_push_route_op(add, Topic).
maybe_push_route_op(add, Topic, Topic).
maybe_delete_route(Topic) ->
maybe_push_route_op(delete, Topic).
maybe_push_route_op(delete, Topic, Topic).
maybe_add_shared_route(Topic, Group) ->
maybe_push_route_op(add, Topic, ?SHARED_ROUTE_ID(Topic, Group)).
maybe_delete_shared_route(Topic, Group) ->
maybe_push_route_op(delete, Topic, ?SHARED_ROUTE_ID(Topic, Group)).
maybe_add_persistent_route(Topic, ID) ->
maybe_push_route_op(add, Topic, ?PERSISTENT_ROUTE_ID(Topic, ID), push_persistent_route).
maybe_delete_persistent_route(Topic, ID) ->
maybe_push_route_op(delete, Topic, ?PERSISTENT_ROUTE_ID(Topic, ID), push_persistent_route).
forward(DestCluster, Delivery) ->
emqx_cluster_link_mqtt:forward(DestCluster, Delivery).
@ -107,15 +129,17 @@ delete_hook() ->
%% Internal functions
%%--------------------------------------------------------------------
maybe_push_route_op(Op, Topic) ->
maybe_push_route_op(Op, Topic, RouteID) ->
maybe_push_route_op(Op, Topic, RouteID, push).
maybe_push_route_op(Op, Topic, RouteID, PushFun) ->
lists:foreach(
fun(#{upstream := Cluster, topics := LinkFilters}) ->
case topic_intersect_any(Topic, LinkFilters) of
false ->
ok;
TopicIntersection ->
ID = Topic,
emqx_cluster_link_router_syncer:push(Cluster, Op, TopicIntersection, ID)
emqx_cluster_link_router_syncer:PushFun(Cluster, Op, TopicIntersection, RouteID)
end
end,
emqx_cluster_link_config:enabled_links()

View File

@ -55,7 +55,7 @@ emqtt_options(LinkName) ->
emqx_maybe:apply(fun mk_emqtt_options/1, ?MODULE:link(LinkName)).
topic_filters(LinkName) ->
maps:get(filters, ?MODULE:link(LinkName), []).
maps:get(topics, ?MODULE:link(LinkName), []).
%%

View File

@ -169,14 +169,15 @@ mnesia_actor_init(Cluster, Actor, Incarnation, TS) ->
%% that applies some update out of the blue, but it seems impossible to prevent
%% it completely w/o transactions.
State = #state{cluster = Cluster, actor = Actor, incarnation = Incarnation},
case mnesia:read(?EXTROUTE_ACTOR_TAB, Actor, write) of
ActorID = ?ACTOR_ID(Cluster, Actor),
case mnesia:read(?EXTROUTE_ACTOR_TAB, ActorID, write) of
[#actor{incarnation = Incarnation, lane = Lane} = Rec] ->
ok = mnesia:write(?EXTROUTE_ACTOR_TAB, Rec#actor{until = bump_actor_ttl(TS)}, write),
{ok, State#state{lane = Lane}};
[] ->
Lane = mnesia_assign_lane(Cluster),
Rec = #actor{
id = ?ACTOR_ID(Cluster, Actor),
id = ActorID,
incarnation = Incarnation,
lane = Lane,
until = bump_actor_ttl(TS)

View File

@ -4,6 +4,10 @@
-module(emqx_cluster_link_router_bootstrap).
-include_lib("emqx/include/emqx_router.hrl").
-include_lib("emqx/include/emqx_shared_sub.hrl").
-include_lib("emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl").
-include("emqx_cluster_link.hrl").
-export([
init/2,
@ -17,7 +21,8 @@
wildcards :: [emqx_types:topic()],
topics :: [emqx_types:topic()],
stash :: [{emqx_types:topic(), _RouteID}],
max_batch_size :: non_neg_integer()
max_batch_size :: non_neg_integer(),
is_persistent_route :: boolean()
}).
%%
@ -25,23 +30,25 @@
init(TargetCluster, Options) ->
LinkFilters = emqx_cluster_link_config:topic_filters(TargetCluster),
{Wildcards, Topics} = lists:partition(fun emqx_topic:wildcard/1, LinkFilters),
IsPersistentRoute = maps:get(is_persistent_route, Options, false),
#bootstrap{
target = TargetCluster,
wildcards = Wildcards,
topics = Topics,
stash = [],
max_batch_size = maps:get(max_batch_size, Options, ?MAX_BATCH_SIZE)
max_batch_size = maps:get(max_batch_size, Options, ?MAX_BATCH_SIZE),
is_persistent_route = IsPersistentRoute
}.
next_batch(B = #bootstrap{stash = S0 = [_ | _], max_batch_size = MBS}) ->
{Batch, Stash} = mk_batch(S0, MBS),
{Batch, B#bootstrap{stash = Stash}};
next_batch(B = #bootstrap{topics = Topics = [_ | _], stash = []}) ->
Routes = select_routes_by_topics(Topics),
next_batch(B#bootstrap{topics = [], stash = Routes});
next_batch(B0 = #bootstrap{wildcards = Wildcards = [_ | _], stash = []}) ->
Routes = select_routes_by_wildcards(Wildcards),
next_batch(B0#bootstrap{wildcards = [], stash = Routes});
next_batch(B = #bootstrap{topics = Topics = [_ | _], stash = [], is_persistent_route = IsPs}) ->
next_batch(B#bootstrap{topics = [], stash = routes_by_topic(Topics, IsPs)});
next_batch(
B0 = #bootstrap{wildcards = Wildcards = [_ | _], stash = [], is_persistent_route = IsPs}
) ->
next_batch(B0#bootstrap{wildcards = [], stash = routes_by_wildcards(Wildcards, IsPs)});
next_batch(#bootstrap{topics = [], wildcards = [], stash = []}) ->
done.
@ -53,6 +60,37 @@ mk_batch(Stash, MaxBatchSize) ->
%%
routes_by_topic(Topics, _IsPersistentRoute = false) ->
Routes = select_routes_by_topics(Topics),
SharedRoutes = select_shared_sub_routes_by_topics(Topics),
Routes ++ SharedRoutes;
routes_by_topic(Topics, _IsPersistentRoute = true) ->
lists:foldl(
fun(T, Acc) ->
Routes = emqx_persistent_session_ds_router:lookup_routes(T),
[encode_route(T, ?PERSISTENT_ROUTE_ID(T, D)) || #ps_route{dest = D} <- Routes] ++ Acc
end,
[],
Topics
).
routes_by_wildcards(Wildcards, _IsPersistentRoute = false) ->
Routes = select_routes_by_wildcards(Wildcards),
SharedRoutes = select_shared_sub_routes_by_wildcards(Wildcards),
Routes ++ SharedRoutes;
routes_by_wildcards(Wildcards, _IsPersistentRoute = true) ->
emqx_persistent_session_ds_router:foldl_routes(
fun(#ps_route{dest = D, topic = T}, Acc) ->
case topic_intersect_any(T, Wildcards) of
false ->
Acc;
Intersec ->
[encode_route(Intersec, ?PERSISTENT_ROUTE_ID(T, D)) | Acc]
end
end,
[]
).
select_routes_by_topics(Topics) ->
[encode_route(Topic, Topic) || Topic <- Topics, emqx_broker:subscribers(Topic) =/= []].
@ -63,12 +101,34 @@ select_routes_by_wildcards(Wildcards) ->
?SUBSCRIBER
).
select_shared_sub_routes_by_topics([T | Topics]) ->
select_shared_sub_routes(T) ++ select_shared_sub_routes_by_topics(Topics);
select_shared_sub_routes_by_topics([]) ->
[].
select_shared_sub_routes_by_wildcards(Wildcards) ->
emqx_utils_ets:keyfoldl(
fun({Group, Topic}, Acc) ->
RouteID = ?SHARED_ROUTE_ID(Topic, Group),
intersecting_route(Topic, RouteID, Wildcards) ++ Acc
end,
[],
?SHARED_SUBSCRIBER
).
select_shared_sub_routes(Topic) ->
LocalGroups = lists:usort(ets:select(?SHARED_SUBSCRIBER, [{{{'$1', Topic}, '_'}, [], ['$1']}])),
[encode_route(Topic, ?SHARED_ROUTE_ID(Topic, G)) || G <- LocalGroups].
intersecting_route(Topic, Wildcards) ->
intersecting_route(Topic, Topic, Wildcards).
intersecting_route(Topic, RouteID, Wildcards) ->
%% TODO: probably nice to validate cluster link topic filters
%% to have no intersections between each other?
case topic_intersect_any(Topic, Wildcards) of
false -> [];
Intersection -> [encode_route(Intersection, Topic)]
Intersection -> [encode_route(Intersection, RouteID)]
end.
topic_intersect_any(Topic, [LinkFilter | T]) ->

View File

@ -7,11 +7,15 @@
%% API
-export([start_link/1]).
-export([push/4]).
-export([
start_link_actor/1,
start_link_syncer/1
push/4,
push_persistent_route/4
]).
-export([
start_link_actor/4,
start_link_syncer/4
]).
%% Internal API / Syncer
@ -47,10 +51,25 @@
-define(RECONNECT_TIMEOUT, 5_000).
%%
%% Special actor for persistent routes that has the same actor name on all nodes.
%% Node actors with the same name nay race with each other (e.g. during bootstrap),
%% but it must be tolerable, since persistent route destination is a client ID,
%% which is unique cluster-wide.
-define(PS_ACTOR, <<"ps-routes-v1">>).
-define(PS_INCARNATION, 0).
-define(PS_ACTOR_REF(Cluster), {via, gproc, ?NAME(Cluster, ps_actor)}).
-define(PS_CLIENT_NAME(Cluster), ?NAME(Cluster, ps_client)).
-define(PS_SYNCER_REF(Cluster), {via, gproc, ?PS_SYNCER_NAME(Cluster)}).
-define(PS_SYNCER_NAME(Cluster), ?NAME(Cluster, ps_syncer)).
push(TargetCluster, OpName, Topic, ID) ->
case gproc:where(?SYNCER_NAME(TargetCluster)) of
do_push(?SYNCER_NAME(TargetCluster), OpName, Topic, ID).
push_persistent_route(TargetCluster, OpName, Topic, ID) ->
do_push(?PS_SYNCER_NAME(TargetCluster), OpName, Topic, ID).
do_push(SyncerName, OpName, Topic, ID) ->
case gproc:where(SyncerName) of
SyncerPid when is_pid(SyncerPid) ->
emqx_router_syncer:push(SyncerPid, OpName, Topic, ID, #{});
undefined ->
@ -66,11 +85,9 @@ start_link(TargetCluster) ->
%% Actor
start_link_actor(TargetCluster) ->
Actor = get_actor_id(),
Incarnation = ensure_actor_incarnation(),
start_link_actor(ActorRef, Actor, Incarnation, TargetCluster) ->
gen_server:start_link(
?ACTOR_REF(TargetCluster),
ActorRef,
?MODULE,
{actor, mk_state(TargetCluster, Actor, Incarnation)},
[]
@ -98,9 +115,9 @@ ensure_actor_incarnation() ->
%% MQTT Client
start_link_client(TargetCluster) ->
start_link_client(TargetCluster, Actor) ->
Options = emqx_cluster_link_config:emqtt_options(TargetCluster),
case emqtt:start_link(refine_client_options(Options)) of
case emqtt:start_link(refine_client_options(Options, Actor)) of
{ok, Pid} ->
case emqtt:connect(Pid) of
{ok, _Props} ->
@ -112,10 +129,15 @@ start_link_client(TargetCluster) ->
Error
end.
refine_client_options(Options = #{clientid := ClientID}) ->
refine_client_options(Options = #{clientid := ClientID}, Actor) ->
Suffix =
case Actor of
?PS_ACTOR -> "-ps";
_ -> ""
end,
%% TODO: Reconnect should help, but it looks broken right now.
Options#{
clientid => emqx_utils:format("~s:~s:routesync", [ClientID, node()]),
clientid => emqx_utils:format("~s:~s:routesync~s", [ClientID, node(), Suffix]),
clean_start => false,
properties => #{'Session-Expiry-Interval' => 60},
retry_interval => 0
@ -129,8 +151,13 @@ client_session_present(ClientPid) ->
1 -> true
end.
announce_client(TargetCluster, Pid) ->
true = gproc:reg_other(?CLIENT_NAME(TargetCluster), Pid),
announce_client(Actor, TargetCluster, Pid) ->
Name =
case Actor of
?PS_ACTOR -> ?PS_CLIENT_NAME(TargetCluster);
_ -> ?CLIENT_NAME(TargetCluster)
end,
true = gproc:reg_other(Name, Pid),
ok.
publish_routes(ClientPid, Actor, Incarnation, Updates) ->
@ -148,19 +175,17 @@ publish_routes(ClientPid, Actor, Incarnation, Updates) ->
%% Route syncer
start_syncer(TargetCluster) ->
case supervisor:start_child(?REF(TargetCluster), child_spec(syncer, TargetCluster)) of
start_syncer(TargetCluster, Actor, Incr) ->
Spec = child_spec(syncer, Actor, Incr, TargetCluster),
case supervisor:start_child(?REF(TargetCluster), Spec) of
{ok, _} ->
ok;
{error, {already_started, _}} ->
ok
end.
start_link_syncer(TargetCluster) ->
Actor = get_actor_id(),
Incarnation = get_actor_incarnation(),
ClientName = ?CLIENT_NAME(TargetCluster),
emqx_router_syncer:start_link(?SYNCER_REF(TargetCluster), #{
start_link_syncer(Actor, Incarnation, SyncerRef, ClientName) ->
emqx_router_syncer:start_link(SyncerRef, #{
max_batch_size => ?MAX_BATCH_SIZE,
min_sync_interval => ?MIN_SYNC_INTERVAL,
error_delay => ?ERROR_DELAY,
@ -169,10 +194,14 @@ start_link_syncer(TargetCluster) ->
%% TODO: enable_replies => false
}).
close_syncer(TargetCluster) ->
close_syncer(TargetCluster, ?PS_ACTOR) ->
emqx_router_syncer:close(?PS_SYNCER_REF(TargetCluster));
close_syncer(TargetCluster, _Actor) ->
emqx_router_syncer:close(?SYNCER_REF(TargetCluster)).
open_syncer(TargetCluster) ->
open_syncer(TargetCluster, ?PS_ACTOR) ->
emqx_router_syncer:open(?PS_SYNCER_REF(TargetCluster));
open_syncer(TargetCluster, _Actor) ->
emqx_router_syncer:open(?SYNCER_REF(TargetCluster)).
process_syncer_batch(Batch, ClientName, Actor, Incarnation) ->
@ -200,7 +229,8 @@ init({sup, TargetCluster}) ->
period => 60
},
Children = [
child_spec(actor, TargetCluster)
child_spec(actor, TargetCluster),
child_spec(ps_actor, TargetCluster)
],
{ok, {SupFlags, Children}};
init({actor, State}) ->
@ -212,20 +242,37 @@ child_spec(actor, TargetCluster) ->
%% ClientID: `mycluster:emqx1@emqx.local:routesync`
%% Occasional TCP/MQTT-level disconnects are expected, and should be handled
%% gracefully.
#{
id => actor,
start => {?MODULE, start_link_actor, [TargetCluster]},
restart => permanent,
type => worker
};
child_spec(syncer, TargetCluster) ->
Actor = get_actor_id(),
Incarnation = ensure_actor_incarnation(),
actor_spec(actor, ?ACTOR_REF(TargetCluster), Actor, Incarnation, TargetCluster);
child_spec(ps_actor, TargetCluster) ->
actor_spec(ps_actor, ?PS_ACTOR_REF(TargetCluster), ?PS_ACTOR, ?PS_INCARNATION, TargetCluster).
child_spec(syncer, ?PS_ACTOR, Incarnation, TargetCluster) ->
SyncerRef = ?PS_SYNCER_REF(TargetCluster),
ClientName = ?PS_CLIENT_NAME(TargetCluster),
syncer_spec(ps_syncer, ?PS_ACTOR, Incarnation, SyncerRef, ClientName);
child_spec(syncer, Actor, Incarnation, TargetCluster) ->
%% Route syncer process.
%% Initially starts in a "closed" state. Actor decides when to open it, i.e.
%% when bootstrapping is done. Syncer crash means re-bootstrap is needed, so
%% we just restart the actor in this case.
SyncerRef = ?SYNCER_REF(TargetCluster),
ClientName = ?CLIENT_NAME(TargetCluster),
syncer_spec(syncer, Actor, Incarnation, SyncerRef, ClientName).
actor_spec(ChildID, ActorRef, Actor, Incarnation, TargetCluster) ->
#{
id => syncer,
start => {?MODULE, start_link_syncer, [TargetCluster]},
id => ChildID,
start => {?MODULE, start_link_actor, [ActorRef, Actor, Incarnation, TargetCluster]},
restart => permanent,
type => worker
}.
syncer_spec(ChildID, Actor, Incarnation, SyncerRef, ClientName) ->
#{
id => ChildID,
start => {?MODULE, start_link_syncer, [Actor, Incarnation, SyncerRef, ClientName]},
restart => permanent,
type => worker
}.
@ -274,12 +321,12 @@ terminate(_Reason, _State) ->
ok.
process_connect(St = #st{target = TargetCluster, actor = Actor, incarnation = Incr}) ->
case start_link_client(TargetCluster) of
case start_link_client(TargetCluster, Actor) of
{ok, ClientPid} ->
%% TODO: error handling, handshake
{ok, _} = emqx_cluster_link_mqtt:publish_actor_init_sync(ClientPid, Actor, Incr),
ok = start_syncer(TargetCluster),
ok = announce_client(TargetCluster, ClientPid),
ok = start_syncer(TargetCluster, Actor, Incr),
ok = announce_client(Actor, TargetCluster, ClientPid),
process_bootstrap(St#st{client = ClientPid});
{error, Reason} ->
handle_connect_error(Reason, St)
@ -290,9 +337,9 @@ handle_connect_error(_Reason, St) ->
TRef = erlang:start_timer(?RECONNECT_TIMEOUT, self(), reconnect),
St#st{reconnect_timer = TRef}.
handle_client_down(_Reason, St = #st{target = TargetCluster}) ->
handle_client_down(_Reason, St = #st{target = TargetCluster, actor = Actor}) ->
%% TODO: logs
ok = close_syncer(TargetCluster),
ok = close_syncer(TargetCluster, Actor),
process_connect(St#st{client = undefined}).
process_bootstrap(St = #st{bootstrapped = false}) ->
@ -311,6 +358,15 @@ process_bootstrap(St = #st{client = ClientPid, bootstrapped = true}) ->
%% is re-established with a clean session. Once bootstrapping is done, it
%% opens the syncer.
run_bootstrap(St = #st{target = TargetCluster, actor = ?PS_ACTOR}) ->
case mria_config:whoami() of
Role when Role /= replicant ->
Opts = #{is_persistent_route => true},
Bootstrap = emqx_cluster_link_router_bootstrap:init(TargetCluster, Opts),
run_bootstrap(Bootstrap, St);
_ ->
process_bootstrapped(St)
end;
run_bootstrap(St = #st{target = TargetCluster}) ->
Bootstrap = emqx_cluster_link_router_bootstrap:init(TargetCluster, #{}),
run_bootstrap(Bootstrap, St).
@ -330,8 +386,8 @@ run_bootstrap(Bootstrap, St) ->
end
end.
process_bootstrapped(St = #st{target = TargetCluster}) ->
ok = open_syncer(TargetCluster),
process_bootstrapped(St = #st{target = TargetCluster, actor = Actor}) ->
ok = open_syncer(TargetCluster, Actor),
St#st{bootstrapped = true}.
process_bootstrap_batch(Batch, #st{client = ClientPid, actor = Actor, incarnation = Incarnation}) ->