From e7305c62ee31ab4f5cd59be3554e2e16887bbef4 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Tue, 21 May 2024 20:07:37 +0300 Subject: [PATCH] feat(clusterlink): replicate shared subscription and persistent session routes --- apps/emqx/src/emqx_external_broker.erl | 22 +++ .../emqx_persistent_session_ds_subs.erl | 2 + apps/emqx/src/emqx_router.erl | 4 +- apps/emqx/src/emqx_shared_sub.erl | 12 +- .../include/emqx_cluster_link.hrl | 4 + apps/emqx_cluster_link/rebar.config | 8 + .../src/emqx_cluster_link.erl | 34 ++++- .../src/emqx_cluster_link_config.erl | 2 +- .../src/emqx_cluster_link_extrouter.erl | 5 +- .../emqx_cluster_link_router_bootstrap.erl | 78 ++++++++-- .../src/emqx_cluster_link_router_syncer.erl | 138 ++++++++++++------ 11 files changed, 246 insertions(+), 63 deletions(-) create mode 100644 apps/emqx_cluster_link/rebar.config diff --git a/apps/emqx/src/emqx_external_broker.erl b/apps/emqx/src/emqx_external_broker.erl index 3b3ff83c6..acd4b8c3d 100644 --- a/apps/emqx/src/emqx_external_broker.erl +++ b/apps/emqx/src/emqx_external_broker.erl @@ -24,6 +24,12 @@ -callback maybe_add_route(emqx_types:topic()) -> ok. -callback maybe_delete_route(emqx_types:topic()) -> ok. +-callback maybe_add_shared_route(emqx_types:topic(), emqx_types:group()) -> ok. +-callback maybe_delete_shared_route(emqx_types:topic(), emqx_types:group()) -> ok. + +-callback maybe_add_persistent_route(emqx_types:topic(), emqx_persistent_session_ds:id()) -> ok. +-callback maybe_delete_persistent_route(emqx_types:topic(), emqx_persistent_session_ds:id()) -> ok. + -callback match_routes(emqx_types:topic()) -> [emqx_types:route()]. -type dest() :: term(). @@ -36,6 +42,10 @@ should_route_to_external_dests/1, maybe_add_route/1, maybe_delete_route/1, + maybe_add_shared_route/2, + maybe_delete_shared_route/2, + maybe_add_persistent_route/2, + maybe_delete_persistent_route/2, match_routes/1 ]). @@ -113,6 +123,18 @@ maybe_add_route(Topic) -> maybe_delete_route(Topic) -> ?safe_with_provider(?FUNCTION_NAME(Topic), ok). +maybe_add_shared_route(Topic, Group) -> + ?safe_with_provider(?FUNCTION_NAME(Topic, Group), ok). + +maybe_delete_shared_route(Topic, Group) -> + ?safe_with_provider(?FUNCTION_NAME(Topic, Group), ok). + +maybe_add_persistent_route(Topic, ID) -> + ?safe_with_provider(?FUNCTION_NAME(Topic, ID), ok). + +maybe_delete_persistent_route(Topic, ID) -> + ?safe_with_provider(?FUNCTION_NAME(Topic, ID), ok). + match_routes(Topic) -> ?safe_with_provider(?FUNCTION_NAME(Topic), ok). diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_subs.erl index e8422674b..fc86b67a6 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_subs.erl @@ -92,6 +92,7 @@ on_subscribe(TopicFilter, SubOpts, #{id := SessionId, s := S0, props := Props}) case emqx_persistent_session_ds_state:n_subscriptions(S0) < MaxSubscriptions of true -> ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, SessionId), + _ = emqx_external_broker:maybe_add_persistent_route(TopicFilter, SessionId), {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0), {SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1), SState = #{ @@ -154,6 +155,7 @@ on_unsubscribe(SessionId, TopicFilter, S0) -> #{session_id => SessionId, topic_filter => TopicFilter}, ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, SessionId) ), + _ = emqx_external_broker:maybe_delete_persistent_route(TopicFilter, SessionId), {ok, emqx_persistent_session_ds_state:del_subscription(TopicFilter, S0), Subscription} end. diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index c60bd2383..0bcaf3c0e 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -91,7 +91,7 @@ deinit_schema/0 ]). --export_type([dest/0, external_dest/0]). +-export_type([dest/0]). -export_type([schemavsn/0]). -type group() :: binary(). @@ -313,7 +313,7 @@ print_routes(Topic) -> match_routes(Topic) ). --spec cleanup_routes(node() | external_dest()) -> ok. +-spec cleanup_routes(node()) -> ok. cleanup_routes(NodeOrExtDest) -> cleanup_routes(get_schema_vsn(), NodeOrExtDest). diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index 54c107111..4498523da 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -421,8 +421,12 @@ init_monitors() -> handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon}) -> mria:dirty_write(?SHARED_SUBSCRIPTION, record(Group, Topic, SubPid)), case ets:member(?SHARED_SUBSCRIBER, {Group, Topic}) of - true -> ok; - false -> ok = emqx_router:do_add_route(Topic, {Group, node()}) + true -> + ok; + false -> + ok = emqx_router:do_add_route(Topic, {Group, node()}), + _ = emqx_external_broker:maybe_add_shared_route(Topic, Group), + ok end, ok = maybe_insert_alive_tab(SubPid), ok = maybe_insert_round_robin_count({Group, Topic}), @@ -545,7 +549,9 @@ is_alive_sub(Pid) -> delete_route_if_needed({Group, Topic} = GroupTopic) -> if_no_more_subscribers(GroupTopic, fun() -> - ok = emqx_router:do_delete_route(Topic, {Group, node()}) + ok = emqx_router:do_delete_route(Topic, {Group, node()}), + _ = emqx_external_broker:maybe_delete_shared_route(Topic, Group), + ok end). get_default_shared_subscription_strategy() -> diff --git a/apps/emqx_cluster_link/include/emqx_cluster_link.hrl b/apps/emqx_cluster_link/include/emqx_cluster_link.hrl index 42eb7ca7b..8bf9dd7c2 100644 --- a/apps/emqx_cluster_link/include/emqx_cluster_link.hrl +++ b/apps/emqx_cluster_link/include/emqx_cluster_link.hrl @@ -8,3 +8,7 @@ -define(MSG_TOPIC_PREFIX, ?TOPIC_PREFIX "msg/"). -define(DEST(FromClusterName), {external, {link, FromClusterName}}). + +%% Fairly compact text encoding. +-define(SHARED_ROUTE_ID(Topic, Group), <<"$s/", Group/binary, "/", Topic/binary>>). +-define(PERSISTENT_ROUTE_ID(Topic, ID), <<"$p/", ID/binary, "/", Topic/binary>>). diff --git a/apps/emqx_cluster_link/rebar.config b/apps/emqx_cluster_link/rebar.config new file mode 100644 index 000000000..8835441e4 --- /dev/null +++ b/apps/emqx_cluster_link/rebar.config @@ -0,0 +1,8 @@ +%% -*- mode: erlang; -*- + +{erl_opts, [debug_info]}. + +{deps, [ + {emqx, {path, "../../apps/emqx"}}, + {emqx_resource, {path, "../../apps/emqx_resource"}} +]}. diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link.erl b/apps/emqx_cluster_link/src/emqx_cluster_link.erl index 579f58dce..8d843edcc 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link.erl @@ -11,6 +11,10 @@ unregister_external_broker/0, maybe_add_route/1, maybe_delete_route/1, + maybe_add_shared_route/2, + maybe_delete_shared_route/2, + maybe_add_persistent_route/2, + maybe_delete_persistent_route/2, match_routes/1, forward/2, should_route_to_external_dests/1 @@ -38,11 +42,29 @@ register_external_broker() -> unregister_external_broker() -> emqx_external_broker:unregister_provider(?MODULE). +%% Using original Topic as Route ID in the most common scenario: +%% (non-shared, non-persistent routes). +%% Original Topic is used to identify the route and be able +%% to delete it on a remote cluster. +%% There is no need to push Node name as this info can be derived from +%% agent state on the remote cluster. maybe_add_route(Topic) -> - maybe_push_route_op(add, Topic). + maybe_push_route_op(add, Topic, Topic). maybe_delete_route(Topic) -> - maybe_push_route_op(delete, Topic). + maybe_push_route_op(delete, Topic, Topic). + +maybe_add_shared_route(Topic, Group) -> + maybe_push_route_op(add, Topic, ?SHARED_ROUTE_ID(Topic, Group)). + +maybe_delete_shared_route(Topic, Group) -> + maybe_push_route_op(delete, Topic, ?SHARED_ROUTE_ID(Topic, Group)). + +maybe_add_persistent_route(Topic, ID) -> + maybe_push_route_op(add, Topic, ?PERSISTENT_ROUTE_ID(Topic, ID), push_persistent_route). + +maybe_delete_persistent_route(Topic, ID) -> + maybe_push_route_op(delete, Topic, ?PERSISTENT_ROUTE_ID(Topic, ID), push_persistent_route). forward(DestCluster, Delivery) -> emqx_cluster_link_mqtt:forward(DestCluster, Delivery). @@ -107,15 +129,17 @@ delete_hook() -> %% Internal functions %%-------------------------------------------------------------------- -maybe_push_route_op(Op, Topic) -> +maybe_push_route_op(Op, Topic, RouteID) -> + maybe_push_route_op(Op, Topic, RouteID, push). + +maybe_push_route_op(Op, Topic, RouteID, PushFun) -> lists:foreach( fun(#{upstream := Cluster, topics := LinkFilters}) -> case topic_intersect_any(Topic, LinkFilters) of false -> ok; TopicIntersection -> - ID = Topic, - emqx_cluster_link_router_syncer:push(Cluster, Op, TopicIntersection, ID) + emqx_cluster_link_router_syncer:PushFun(Cluster, Op, TopicIntersection, RouteID) end end, emqx_cluster_link_config:enabled_links() diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl index be2769fd5..ba17d22e8 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl @@ -55,7 +55,7 @@ emqtt_options(LinkName) -> emqx_maybe:apply(fun mk_emqtt_options/1, ?MODULE:link(LinkName)). topic_filters(LinkName) -> - maps:get(filters, ?MODULE:link(LinkName), []). + maps:get(topics, ?MODULE:link(LinkName), []). %% diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl index 504e59c74..bbec844df 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl @@ -169,14 +169,15 @@ mnesia_actor_init(Cluster, Actor, Incarnation, TS) -> %% that applies some update out of the blue, but it seems impossible to prevent %% it completely w/o transactions. State = #state{cluster = Cluster, actor = Actor, incarnation = Incarnation}, - case mnesia:read(?EXTROUTE_ACTOR_TAB, Actor, write) of + ActorID = ?ACTOR_ID(Cluster, Actor), + case mnesia:read(?EXTROUTE_ACTOR_TAB, ActorID, write) of [#actor{incarnation = Incarnation, lane = Lane} = Rec] -> ok = mnesia:write(?EXTROUTE_ACTOR_TAB, Rec#actor{until = bump_actor_ttl(TS)}, write), {ok, State#state{lane = Lane}}; [] -> Lane = mnesia_assign_lane(Cluster), Rec = #actor{ - id = ?ACTOR_ID(Cluster, Actor), + id = ActorID, incarnation = Incarnation, lane = Lane, until = bump_actor_ttl(TS) diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_router_bootstrap.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_router_bootstrap.erl index 8c0e609dc..105b8d94c 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_router_bootstrap.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_router_bootstrap.erl @@ -4,6 +4,10 @@ -module(emqx_cluster_link_router_bootstrap). -include_lib("emqx/include/emqx_router.hrl"). +-include_lib("emqx/include/emqx_shared_sub.hrl"). +-include_lib("emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl"). + +-include("emqx_cluster_link.hrl"). -export([ init/2, @@ -17,7 +21,8 @@ wildcards :: [emqx_types:topic()], topics :: [emqx_types:topic()], stash :: [{emqx_types:topic(), _RouteID}], - max_batch_size :: non_neg_integer() + max_batch_size :: non_neg_integer(), + is_persistent_route :: boolean() }). %% @@ -25,23 +30,25 @@ init(TargetCluster, Options) -> LinkFilters = emqx_cluster_link_config:topic_filters(TargetCluster), {Wildcards, Topics} = lists:partition(fun emqx_topic:wildcard/1, LinkFilters), + IsPersistentRoute = maps:get(is_persistent_route, Options, false), #bootstrap{ target = TargetCluster, wildcards = Wildcards, topics = Topics, stash = [], - max_batch_size = maps:get(max_batch_size, Options, ?MAX_BATCH_SIZE) + max_batch_size = maps:get(max_batch_size, Options, ?MAX_BATCH_SIZE), + is_persistent_route = IsPersistentRoute }. next_batch(B = #bootstrap{stash = S0 = [_ | _], max_batch_size = MBS}) -> {Batch, Stash} = mk_batch(S0, MBS), {Batch, B#bootstrap{stash = Stash}}; -next_batch(B = #bootstrap{topics = Topics = [_ | _], stash = []}) -> - Routes = select_routes_by_topics(Topics), - next_batch(B#bootstrap{topics = [], stash = Routes}); -next_batch(B0 = #bootstrap{wildcards = Wildcards = [_ | _], stash = []}) -> - Routes = select_routes_by_wildcards(Wildcards), - next_batch(B0#bootstrap{wildcards = [], stash = Routes}); +next_batch(B = #bootstrap{topics = Topics = [_ | _], stash = [], is_persistent_route = IsPs}) -> + next_batch(B#bootstrap{topics = [], stash = routes_by_topic(Topics, IsPs)}); +next_batch( + B0 = #bootstrap{wildcards = Wildcards = [_ | _], stash = [], is_persistent_route = IsPs} +) -> + next_batch(B0#bootstrap{wildcards = [], stash = routes_by_wildcards(Wildcards, IsPs)}); next_batch(#bootstrap{topics = [], wildcards = [], stash = []}) -> done. @@ -53,6 +60,37 @@ mk_batch(Stash, MaxBatchSize) -> %% +routes_by_topic(Topics, _IsPersistentRoute = false) -> + Routes = select_routes_by_topics(Topics), + SharedRoutes = select_shared_sub_routes_by_topics(Topics), + Routes ++ SharedRoutes; +routes_by_topic(Topics, _IsPersistentRoute = true) -> + lists:foldl( + fun(T, Acc) -> + Routes = emqx_persistent_session_ds_router:lookup_routes(T), + [encode_route(T, ?PERSISTENT_ROUTE_ID(T, D)) || #ps_route{dest = D} <- Routes] ++ Acc + end, + [], + Topics + ). + +routes_by_wildcards(Wildcards, _IsPersistentRoute = false) -> + Routes = select_routes_by_wildcards(Wildcards), + SharedRoutes = select_shared_sub_routes_by_wildcards(Wildcards), + Routes ++ SharedRoutes; +routes_by_wildcards(Wildcards, _IsPersistentRoute = true) -> + emqx_persistent_session_ds_router:foldl_routes( + fun(#ps_route{dest = D, topic = T}, Acc) -> + case topic_intersect_any(T, Wildcards) of + false -> + Acc; + Intersec -> + [encode_route(Intersec, ?PERSISTENT_ROUTE_ID(T, D)) | Acc] + end + end, + [] + ). + select_routes_by_topics(Topics) -> [encode_route(Topic, Topic) || Topic <- Topics, emqx_broker:subscribers(Topic) =/= []]. @@ -63,12 +101,34 @@ select_routes_by_wildcards(Wildcards) -> ?SUBSCRIBER ). +select_shared_sub_routes_by_topics([T | Topics]) -> + select_shared_sub_routes(T) ++ select_shared_sub_routes_by_topics(Topics); +select_shared_sub_routes_by_topics([]) -> + []. + +select_shared_sub_routes_by_wildcards(Wildcards) -> + emqx_utils_ets:keyfoldl( + fun({Group, Topic}, Acc) -> + RouteID = ?SHARED_ROUTE_ID(Topic, Group), + intersecting_route(Topic, RouteID, Wildcards) ++ Acc + end, + [], + ?SHARED_SUBSCRIBER + ). + +select_shared_sub_routes(Topic) -> + LocalGroups = lists:usort(ets:select(?SHARED_SUBSCRIBER, [{{{'$1', Topic}, '_'}, [], ['$1']}])), + [encode_route(Topic, ?SHARED_ROUTE_ID(Topic, G)) || G <- LocalGroups]. + intersecting_route(Topic, Wildcards) -> + intersecting_route(Topic, Topic, Wildcards). + +intersecting_route(Topic, RouteID, Wildcards) -> %% TODO: probably nice to validate cluster link topic filters %% to have no intersections between each other? case topic_intersect_any(Topic, Wildcards) of false -> []; - Intersection -> [encode_route(Intersection, Topic)] + Intersection -> [encode_route(Intersection, RouteID)] end. topic_intersect_any(Topic, [LinkFilter | T]) -> diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl index d41a41c5f..62d434071 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl @@ -7,11 +7,15 @@ %% API -export([start_link/1]). --export([push/4]). -export([ - start_link_actor/1, - start_link_syncer/1 + push/4, + push_persistent_route/4 +]). + +-export([ + start_link_actor/4, + start_link_syncer/4 ]). %% Internal API / Syncer @@ -47,10 +51,25 @@ -define(RECONNECT_TIMEOUT, 5_000). -%% +%% Special actor for persistent routes that has the same actor name on all nodes. +%% Node actors with the same name nay race with each other (e.g. during bootstrap), +%% but it must be tolerable, since persistent route destination is a client ID, +%% which is unique cluster-wide. +-define(PS_ACTOR, <<"ps-routes-v1">>). +-define(PS_INCARNATION, 0). +-define(PS_ACTOR_REF(Cluster), {via, gproc, ?NAME(Cluster, ps_actor)}). +-define(PS_CLIENT_NAME(Cluster), ?NAME(Cluster, ps_client)). +-define(PS_SYNCER_REF(Cluster), {via, gproc, ?PS_SYNCER_NAME(Cluster)}). +-define(PS_SYNCER_NAME(Cluster), ?NAME(Cluster, ps_syncer)). push(TargetCluster, OpName, Topic, ID) -> - case gproc:where(?SYNCER_NAME(TargetCluster)) of + do_push(?SYNCER_NAME(TargetCluster), OpName, Topic, ID). + +push_persistent_route(TargetCluster, OpName, Topic, ID) -> + do_push(?PS_SYNCER_NAME(TargetCluster), OpName, Topic, ID). + +do_push(SyncerName, OpName, Topic, ID) -> + case gproc:where(SyncerName) of SyncerPid when is_pid(SyncerPid) -> emqx_router_syncer:push(SyncerPid, OpName, Topic, ID, #{}); undefined -> @@ -66,11 +85,9 @@ start_link(TargetCluster) -> %% Actor -start_link_actor(TargetCluster) -> - Actor = get_actor_id(), - Incarnation = ensure_actor_incarnation(), +start_link_actor(ActorRef, Actor, Incarnation, TargetCluster) -> gen_server:start_link( - ?ACTOR_REF(TargetCluster), + ActorRef, ?MODULE, {actor, mk_state(TargetCluster, Actor, Incarnation)}, [] @@ -98,9 +115,9 @@ ensure_actor_incarnation() -> %% MQTT Client -start_link_client(TargetCluster) -> +start_link_client(TargetCluster, Actor) -> Options = emqx_cluster_link_config:emqtt_options(TargetCluster), - case emqtt:start_link(refine_client_options(Options)) of + case emqtt:start_link(refine_client_options(Options, Actor)) of {ok, Pid} -> case emqtt:connect(Pid) of {ok, _Props} -> @@ -112,10 +129,15 @@ start_link_client(TargetCluster) -> Error end. -refine_client_options(Options = #{clientid := ClientID}) -> +refine_client_options(Options = #{clientid := ClientID}, Actor) -> + Suffix = + case Actor of + ?PS_ACTOR -> "-ps"; + _ -> "" + end, %% TODO: Reconnect should help, but it looks broken right now. Options#{ - clientid => emqx_utils:format("~s:~s:routesync", [ClientID, node()]), + clientid => emqx_utils:format("~s:~s:routesync~s", [ClientID, node(), Suffix]), clean_start => false, properties => #{'Session-Expiry-Interval' => 60}, retry_interval => 0 @@ -129,8 +151,13 @@ client_session_present(ClientPid) -> 1 -> true end. -announce_client(TargetCluster, Pid) -> - true = gproc:reg_other(?CLIENT_NAME(TargetCluster), Pid), +announce_client(Actor, TargetCluster, Pid) -> + Name = + case Actor of + ?PS_ACTOR -> ?PS_CLIENT_NAME(TargetCluster); + _ -> ?CLIENT_NAME(TargetCluster) + end, + true = gproc:reg_other(Name, Pid), ok. publish_routes(ClientPid, Actor, Incarnation, Updates) -> @@ -148,19 +175,17 @@ publish_routes(ClientPid, Actor, Incarnation, Updates) -> %% Route syncer -start_syncer(TargetCluster) -> - case supervisor:start_child(?REF(TargetCluster), child_spec(syncer, TargetCluster)) of +start_syncer(TargetCluster, Actor, Incr) -> + Spec = child_spec(syncer, Actor, Incr, TargetCluster), + case supervisor:start_child(?REF(TargetCluster), Spec) of {ok, _} -> ok; {error, {already_started, _}} -> ok end. -start_link_syncer(TargetCluster) -> - Actor = get_actor_id(), - Incarnation = get_actor_incarnation(), - ClientName = ?CLIENT_NAME(TargetCluster), - emqx_router_syncer:start_link(?SYNCER_REF(TargetCluster), #{ +start_link_syncer(Actor, Incarnation, SyncerRef, ClientName) -> + emqx_router_syncer:start_link(SyncerRef, #{ max_batch_size => ?MAX_BATCH_SIZE, min_sync_interval => ?MIN_SYNC_INTERVAL, error_delay => ?ERROR_DELAY, @@ -169,10 +194,14 @@ start_link_syncer(TargetCluster) -> %% TODO: enable_replies => false }). -close_syncer(TargetCluster) -> +close_syncer(TargetCluster, ?PS_ACTOR) -> + emqx_router_syncer:close(?PS_SYNCER_REF(TargetCluster)); +close_syncer(TargetCluster, _Actor) -> emqx_router_syncer:close(?SYNCER_REF(TargetCluster)). -open_syncer(TargetCluster) -> +open_syncer(TargetCluster, ?PS_ACTOR) -> + emqx_router_syncer:open(?PS_SYNCER_REF(TargetCluster)); +open_syncer(TargetCluster, _Actor) -> emqx_router_syncer:open(?SYNCER_REF(TargetCluster)). process_syncer_batch(Batch, ClientName, Actor, Incarnation) -> @@ -200,7 +229,8 @@ init({sup, TargetCluster}) -> period => 60 }, Children = [ - child_spec(actor, TargetCluster) + child_spec(actor, TargetCluster), + child_spec(ps_actor, TargetCluster) ], {ok, {SupFlags, Children}}; init({actor, State}) -> @@ -212,20 +242,37 @@ child_spec(actor, TargetCluster) -> %% ClientID: `mycluster:emqx1@emqx.local:routesync` %% Occasional TCP/MQTT-level disconnects are expected, and should be handled %% gracefully. - #{ - id => actor, - start => {?MODULE, start_link_actor, [TargetCluster]}, - restart => permanent, - type => worker - }; -child_spec(syncer, TargetCluster) -> + Actor = get_actor_id(), + Incarnation = ensure_actor_incarnation(), + actor_spec(actor, ?ACTOR_REF(TargetCluster), Actor, Incarnation, TargetCluster); +child_spec(ps_actor, TargetCluster) -> + actor_spec(ps_actor, ?PS_ACTOR_REF(TargetCluster), ?PS_ACTOR, ?PS_INCARNATION, TargetCluster). + +child_spec(syncer, ?PS_ACTOR, Incarnation, TargetCluster) -> + SyncerRef = ?PS_SYNCER_REF(TargetCluster), + ClientName = ?PS_CLIENT_NAME(TargetCluster), + syncer_spec(ps_syncer, ?PS_ACTOR, Incarnation, SyncerRef, ClientName); +child_spec(syncer, Actor, Incarnation, TargetCluster) -> %% Route syncer process. %% Initially starts in a "closed" state. Actor decides when to open it, i.e. %% when bootstrapping is done. Syncer crash means re-bootstrap is needed, so %% we just restart the actor in this case. + SyncerRef = ?SYNCER_REF(TargetCluster), + ClientName = ?CLIENT_NAME(TargetCluster), + syncer_spec(syncer, Actor, Incarnation, SyncerRef, ClientName). + +actor_spec(ChildID, ActorRef, Actor, Incarnation, TargetCluster) -> #{ - id => syncer, - start => {?MODULE, start_link_syncer, [TargetCluster]}, + id => ChildID, + start => {?MODULE, start_link_actor, [ActorRef, Actor, Incarnation, TargetCluster]}, + restart => permanent, + type => worker + }. + +syncer_spec(ChildID, Actor, Incarnation, SyncerRef, ClientName) -> + #{ + id => ChildID, + start => {?MODULE, start_link_syncer, [Actor, Incarnation, SyncerRef, ClientName]}, restart => permanent, type => worker }. @@ -274,12 +321,12 @@ terminate(_Reason, _State) -> ok. process_connect(St = #st{target = TargetCluster, actor = Actor, incarnation = Incr}) -> - case start_link_client(TargetCluster) of + case start_link_client(TargetCluster, Actor) of {ok, ClientPid} -> %% TODO: error handling, handshake {ok, _} = emqx_cluster_link_mqtt:publish_actor_init_sync(ClientPid, Actor, Incr), - ok = start_syncer(TargetCluster), - ok = announce_client(TargetCluster, ClientPid), + ok = start_syncer(TargetCluster, Actor, Incr), + ok = announce_client(Actor, TargetCluster, ClientPid), process_bootstrap(St#st{client = ClientPid}); {error, Reason} -> handle_connect_error(Reason, St) @@ -290,9 +337,9 @@ handle_connect_error(_Reason, St) -> TRef = erlang:start_timer(?RECONNECT_TIMEOUT, self(), reconnect), St#st{reconnect_timer = TRef}. -handle_client_down(_Reason, St = #st{target = TargetCluster}) -> +handle_client_down(_Reason, St = #st{target = TargetCluster, actor = Actor}) -> %% TODO: logs - ok = close_syncer(TargetCluster), + ok = close_syncer(TargetCluster, Actor), process_connect(St#st{client = undefined}). process_bootstrap(St = #st{bootstrapped = false}) -> @@ -311,6 +358,15 @@ process_bootstrap(St = #st{client = ClientPid, bootstrapped = true}) -> %% is re-established with a clean session. Once bootstrapping is done, it %% opens the syncer. +run_bootstrap(St = #st{target = TargetCluster, actor = ?PS_ACTOR}) -> + case mria_config:whoami() of + Role when Role /= replicant -> + Opts = #{is_persistent_route => true}, + Bootstrap = emqx_cluster_link_router_bootstrap:init(TargetCluster, Opts), + run_bootstrap(Bootstrap, St); + _ -> + process_bootstrapped(St) + end; run_bootstrap(St = #st{target = TargetCluster}) -> Bootstrap = emqx_cluster_link_router_bootstrap:init(TargetCluster, #{}), run_bootstrap(Bootstrap, St). @@ -330,8 +386,8 @@ run_bootstrap(Bootstrap, St) -> end end. -process_bootstrapped(St = #st{target = TargetCluster}) -> - ok = open_syncer(TargetCluster), +process_bootstrapped(St = #st{target = TargetCluster, actor = Actor}) -> + ok = open_syncer(TargetCluster, Actor), St#st{bootstrapped = true}. process_bootstrap_batch(Batch, #st{client = ClientPid, actor = Actor, incarnation = Incarnation}) ->