From cbd01ae8182824138510fd0edebf6a6cccff1922 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 10 May 2024 15:30:01 +0200 Subject: [PATCH] feat(clusterlink): add node-local route sync actor implementation --- .../src/emqx_cluster_link_config.erl | 59 ++++ .../src/emqx_cluster_link_mqtt.erl | 30 ++ .../emqx_cluster_link_router_bootstrap.erl | 83 +++++ .../src/emqx_cluster_link_router_syncer.erl | 321 ++++++++++++++++++ apps/emqx_utils/src/emqx_utils.erl | 4 + 5 files changed, 497 insertions(+) create mode 100644 apps/emqx_cluster_link/src/emqx_cluster_link_router_bootstrap.erl create mode 100644 apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl index ade3a8c97..bdbb702ca 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl @@ -11,6 +11,18 @@ -define(LINKS_PATH, [cluster, links]). -define(CERTS_PATH(LinkName), filename:join(["cluster", "links", LinkName])). +-define(MQTT_HOST_OPTS, #{default_port => 1883}). + +-export([ + %% General + cluster/0, + links/0, + link/1, + topic_filters/1, + %% Connections + emqtt_options/1 +]). + -export([ add_handler/0, remove_handler/0 @@ -21,6 +33,53 @@ post_config_update/5 ]). +%% + +cluster() -> + atom_to_binary(emqx_config:get([cluster, name])). + +links() -> + emqx:get_config(?LINKS_PATH, []). + +link(Name) -> + case lists:dropwhile(fun(L) -> Name =/= upstream_name(L) end, links()) of + [LinkConf | _] -> LinkConf; + [] -> undefined + end. + +emqtt_options(LinkName) -> + emqx_maybe:apply(fun mk_emqtt_options/1, ?MODULE:link(LinkName)). + +topic_filters(LinkName) -> + maps:get(filters, ?MODULE:link(LinkName), []). + +%% + +mk_emqtt_options(#{server := Server, ssl := #{enable := EnableSsl} = Ssl} = LinkConf) -> + ClientId = maps:get(client_id, LinkConf, cluster()), + #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?MQTT_HOST_OPTS), + Opts = #{ + host => Host, + port => Port, + clientid => ClientId, + proto_ver => v5, + ssl => EnableSsl, + ssl_opts => maps:to_list(maps:remove(enable, Ssl)) + }, + with_password(with_user(Opts, LinkConf), LinkConf). + +with_user(Opts, #{username := U} = _LinkConf) -> + Opts#{username => U}; +with_user(Opts, _LinkConf) -> + Opts. + +with_password(Opts, #{password := P} = _LinkConf) -> + Opts#{password => emqx_secret:unwrap(P)}; +with_password(Opts, _LinkConf) -> + Opts. + +%% + add_handler() -> ok = emqx_config_handler:add_handler(?LINKS_PATH, ?MODULE). diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl index 1e9310aca..b111be954 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl @@ -44,6 +44,11 @@ decode_forwarded_msg/1 ]). +-export([ + publish_route_sync/4, + encode_field/2 +]). + -export([ forward/2 ]). @@ -79,9 +84,19 @@ %% It's worth optimizing non-batch op payload size, %% thus it's encoded as a plain binary -define(TOPIC_WITH_OP(Op, Topic), <>). + -define(DECODE(Payload), erlang:binary_to_term(Payload, [safe])). -define(ENCODE(Payload), erlang:term_to_binary(Payload)). +-define(F_OPERATION, '$op'). +-define(OP_ROUTE, <<"route">>). + +-define(F_ACTOR, 10). +-define(F_INCARNATION, 11). +-define(F_ROUTES, 12). + +-define(ROUTE_DELETE, 100). + -define(PUB_TIMEOUT, 10_000). ensure_msg_fwd_resource(#{upstream := Name, pool_size := PoolSize} = ClusterConf) -> @@ -388,6 +403,16 @@ publish_result(Caller, Ref, Result) -> Caller ! {pub_result, Ref, Err} end. +publish_route_sync(ClientPid, Actor, Incarnation, Updates) -> + PubTopic = ?ROUTE_TOPIC, + Payload = #{ + ?F_OPERATION => ?OP_ROUTE, + ?F_ACTOR => Actor, + ?F_INCARNATION => Incarnation, + ?F_ROUTES => Updates + }, + emqtt:publish(ClientPid, PubTopic, ?ENCODE(Payload), ?QOS_1). + %%-------------------------------------------------------------------- %% Protocol %%-------------------------------------------------------------------- @@ -498,6 +523,11 @@ decode_proto_ver1(ProtoVer) -> %% Let it fail (for now), we don't expect invalid data to pass through the linking protocol.. {emqx_utils_conv:int(Major), emqx_utils_conv:int(Minor)}. +encode_field(route, {add, Route = {_Topic, _ID}}) -> + Route; +encode_field(route, {delete, {Topic, ID}}) -> + {?ROUTE_DELETE, Topic, ID}. + %%-------------------------------------------------------------------- %% emqx_external_broker %%-------------------------------------------------------------------- diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_router_bootstrap.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_router_bootstrap.erl new file mode 100644 index 000000000..8c0e609dc --- /dev/null +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_router_bootstrap.erl @@ -0,0 +1,83 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_cluster_link_router_bootstrap). + +-include_lib("emqx/include/emqx_router.hrl"). + +-export([ + init/2, + next_batch/1 +]). + +-define(MAX_BATCH_SIZE, 4000). + +-record(bootstrap, { + target :: _ClusterName :: binary(), + wildcards :: [emqx_types:topic()], + topics :: [emqx_types:topic()], + stash :: [{emqx_types:topic(), _RouteID}], + max_batch_size :: non_neg_integer() +}). + +%% + +init(TargetCluster, Options) -> + LinkFilters = emqx_cluster_link_config:topic_filters(TargetCluster), + {Wildcards, Topics} = lists:partition(fun emqx_topic:wildcard/1, LinkFilters), + #bootstrap{ + target = TargetCluster, + wildcards = Wildcards, + topics = Topics, + stash = [], + max_batch_size = maps:get(max_batch_size, Options, ?MAX_BATCH_SIZE) + }. + +next_batch(B = #bootstrap{stash = S0 = [_ | _], max_batch_size = MBS}) -> + {Batch, Stash} = mk_batch(S0, MBS), + {Batch, B#bootstrap{stash = Stash}}; +next_batch(B = #bootstrap{topics = Topics = [_ | _], stash = []}) -> + Routes = select_routes_by_topics(Topics), + next_batch(B#bootstrap{topics = [], stash = Routes}); +next_batch(B0 = #bootstrap{wildcards = Wildcards = [_ | _], stash = []}) -> + Routes = select_routes_by_wildcards(Wildcards), + next_batch(B0#bootstrap{wildcards = [], stash = Routes}); +next_batch(#bootstrap{topics = [], wildcards = [], stash = []}) -> + done. + +mk_batch(Stash, MaxBatchSize) when length(Stash) =< MaxBatchSize -> + {Stash, []}; +mk_batch(Stash, MaxBatchSize) -> + {Batch, Rest} = lists:split(MaxBatchSize, Stash), + {Batch, Rest}. + +%% + +select_routes_by_topics(Topics) -> + [encode_route(Topic, Topic) || Topic <- Topics, emqx_broker:subscribers(Topic) =/= []]. + +select_routes_by_wildcards(Wildcards) -> + emqx_utils_ets:keyfoldl( + fun(Topic, Acc) -> intersecting_route(Topic, Wildcards) ++ Acc end, + [], + ?SUBSCRIBER + ). + +intersecting_route(Topic, Wildcards) -> + %% TODO: probably nice to validate cluster link topic filters + %% to have no intersections between each other? + case topic_intersect_any(Topic, Wildcards) of + false -> []; + Intersection -> [encode_route(Intersection, Topic)] + end. + +topic_intersect_any(Topic, [LinkFilter | T]) -> + case emqx_topic:intersection(Topic, LinkFilter) of + false -> topic_intersect_any(Topic, T); + TopicOrFilter -> TopicOrFilter + end; +topic_intersect_any(_Topic, []) -> + false. + +encode_route(Topic, RouteID) -> + emqx_cluster_link_mqtt:encode_field(route, {add, {Topic, RouteID}}). diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl new file mode 100644 index 000000000..48dda2e2d --- /dev/null +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl @@ -0,0 +1,321 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_cluster_link_router_syncer). + +-include_lib("emqtt/include/emqtt.hrl"). + +%% API +-export([start_link/1]). +-export([push/4]). + +-export([ + start_link_actor/1, + start_link_syncer/1 +]). + +%% Internal API / Syncer +-export([ + process_syncer_batch/4 +]). + +-behaviour(supervisor). +-export([init/1]). + +-behaviour(gen_server). +-export([ + handle_continue/2, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2 +]). + +-define(NAME(Cluster), {n, l, {?MODULE, Cluster}}). +-define(REF(Cluster), {via, gproc, ?NAME(Cluster)}). + +-define(NAME(Cluster, What), {n, l, {?MODULE, Cluster, What}}). +-define(CLIENT_NAME(Cluster), ?NAME(Cluster, client)). +-define(SYNCER_NAME(Cluster), ?NAME(Cluster, syncer)). +-define(SYNCER_REF(Cluster), {via, gproc, ?SYNCER_NAME(Cluster)}). +-define(ACTOR_REF(Cluster), {via, gproc, ?NAME(Cluster, actor)}). + +-define(MAX_BATCH_SIZE, 4000). +-define(MIN_SYNC_INTERVAL, 10). +-define(ERROR_DELAY, 200). + +-define(RECONNECT_TIMEOUT, 5_000). + +%% + +push(TargetCluster, OpName, Topic, ID) -> + case gproc:where(?SYNCER_NAME(TargetCluster)) of + SyncerPid when is_pid(SyncerPid) -> + emqx_router_syncer:push(SyncerPid, OpName, Topic, ID, #{}); + undefined -> + dropped + end. + +%% Supervisor: +%% 1. Actor + MQTT Client +%% 2. Syncer + +start_link(TargetCluster) -> + supervisor:start_link(?REF(TargetCluster), ?MODULE, {sup, TargetCluster}). + +%% Actor + +start_link_actor(TargetCluster) -> + Actor = get_actor_id(), + Incarnation = ensure_actor_incarnation(), + gen_server:start_link( + ?ACTOR_REF(TargetCluster), + ?MODULE, + {actor, mk_state(TargetCluster, Actor, Incarnation)}, + [] + ). + +get_actor_id() -> + atom_to_binary(node()). + +get_actor_incarnation() -> + persistent_term:get({?MODULE, incarnation}). + +set_actor_incarnation(Incarnation) -> + ok = persistent_term:put({?MODULE, incarnation}, Incarnation), + Incarnation. + +ensure_actor_incarnation() -> + try + get_actor_incarnation() + catch + error:badarg -> + %% TODO: Subject to clock skew, need something more robust. + Incarnation = erlang:system_time(millisecond), + set_actor_incarnation(Incarnation) + end. + +%% MQTT Client + +start_link_client(TargetCluster) -> + Options = emqx_cluster_link_config:emqtt_options(TargetCluster), + emqtt:start_link(refine_client_options(Options)). + +refine_client_options(Options = #{clientid := ClientID}) -> + %% TODO: Reconnect should help, but it looks broken right now. + Options#{ + clientid => emqx_utils:format("~s:~s:routesync", [ClientID, node()]), + clean_start => false, + properties => #{'Session-Expiry-Interval' => 60}, + retry_interval => 0 + }. + +client_session_present(ClientPid) -> + Info = emqtt:info(ClientPid), + proplists:get_value(session_present, Info, false). + +announce_client(TargetCluster, Pid) -> + true = gproc:reg_other(?CLIENT_NAME(TargetCluster), Pid), + ok. + +publish_routes(ClientPid, Actor, Incarnation, Updates) -> + try emqx_cluster_link_mqtt:publish_route_sync(ClientPid, Actor, Incarnation, Updates) of + {ok, #{reason_code := RC}} when RC < ?RC_UNSPECIFIED_ERROR -> + #{}; + {ok, #{reason_code_name := RCN}} -> + {error, {mqtt, RCN}}; + {error, Reason} -> + {error, Reason} + catch + exit:Reason -> + {error, {client, ClientPid, Reason}} + end. + +%% Route syncer + +start_syncer(TargetCluster) -> + case supervisor:start_child(?REF(TargetCluster), child_spec(syncer, TargetCluster)) of + {ok, _} -> + ok; + {error, {already_started, _}} -> + ok + end. + +start_link_syncer(TargetCluster) -> + Actor = get_actor_id(), + Incarnation = get_actor_incarnation(), + ClientName = ?CLIENT_NAME(TargetCluster), + emqx_router_syncer:start_link(?SYNCER_REF(TargetCluster), #{ + max_batch_size => ?MAX_BATCH_SIZE, + min_sync_interval => ?MIN_SYNC_INTERVAL, + error_delay => ?ERROR_DELAY, + initial_state => closed, + batch_handler => {?MODULE, process_syncer_batch, [ClientName, Actor, Incarnation]} + %% TODO: enable_replies => false + }). + +close_syncer(TargetCluster) -> + emqx_router_syncer:close(?SYNCER_REF(TargetCluster)). + +open_syncer(TargetCluster) -> + emqx_router_syncer:open(?SYNCER_REF(TargetCluster)). + +process_syncer_batch(Batch, ClientName, Actor, Incarnation) -> + Updates = maps:fold( + fun(Route, Op, Acc) -> + OpName = batch_get_opname(Op), + Entry = emqx_cluster_link_mqtt:encode_field(route, {OpName, Route}), + [Entry | Acc] + end, + [], + Batch + ), + publish_routes(gproc:where(ClientName), Actor, Incarnation, Updates). + +batch_get_opname(Op) -> + element(1, Op). + +%% + +init({sup, TargetCluster}) -> + %% FIXME: Intensity. + SupFlags = #{ + strategy => all_for_one, + intensity => 10, + period => 60 + }, + Children = [ + child_spec(actor, TargetCluster) + ], + {ok, {SupFlags, Children}}; +init({actor, State}) -> + init_actor(State). + +child_spec(actor, TargetCluster) -> + %% Actor process. + %% Wraps MQTT Client process. + %% ClientID: `mycluster:emqx1@emqx.local:routesync` + %% Occasional TCP/MQTT-level disconnects are expected, and should be handled + %% gracefully. + #{ + id => actor, + start => {?MODULE, start_link_actor, [TargetCluster]}, + restart => permanent, + type => worker + }; +child_spec(syncer, TargetCluster) -> + %% Route syncer process. + %% Initially starts in a "closed" state. Actor decides when to open it, i.e. + %% when bootstrapping is done. Syncer crash means re-bootstrap is needed, so + %% we just restart the actor in this case. + #{ + id => syncer, + start => {?MODULE, start_link_syncer, [TargetCluster]}, + restart => permanent, + type => worker + }. + +%% + +-record(st, { + target :: binary(), + actor :: binary(), + incarnation :: non_neg_integer(), + client :: {pid(), reference()}, + bootstrapped :: boolean(), + reconnect_timer :: reference() +}). + +mk_state(TargetCluster, Actor, Incarnation) -> + #st{ + target = TargetCluster, + actor = Actor, + incarnation = Incarnation, + bootstrapped = false + }. + +init_actor(State = #st{}) -> + _ = erlang:process_flag(trap_exit, true), + {ok, State, {continue, connect}}. + +handle_continue(connect, State) -> + process_connect(State). + +handle_call(_Request, _From, State) -> + {reply, ignored, State}. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info({'EXIT', ClientPid, Reason}, St = #st{client = ClientPid}) -> + handle_client_down(Reason, St); +handle_info({timeout, TRef, _Reconnect}, St = #st{reconnect_timer = TRef}) -> + process_connect(St#st{reconnect_timer = undefined}); +handle_info(_Info, St) -> + %% TODO: log? + {noreply, St}. + +terminate(_Reason, _State) -> + ok. + +process_connect(St = #st{actor = TargetCluster}) -> + case start_link_client(TargetCluster) of + {ok, ClientPid} -> + ok = start_syncer(TargetCluster), + ok = announce_client(TargetCluster, ClientPid), + process_bootstrap(St#st{client = ClientPid}); + {error, Reason} -> + handle_connect_error(Reason, St) + end. + +handle_connect_error(Reason, St) -> + %% TODO: logs + TRef = erlang:start_timer(?RECONNECT_TIMEOUT, self(), reconnect), + St#st{reconnect_timer = TRef}. + +handle_client_down(Reason, St = #st{target = TargetCluster}) -> + %% TODO: logs + ok = close_syncer(TargetCluster), + process_connect(St#st{client = undefined}). + +process_bootstrap(St = #st{bootstrapped = false}) -> + run_bootstrap(St); +process_bootstrap(St = #st{client = ClientPid, bootstrapped = true}) -> + case client_session_present(ClientPid) of + true -> + process_bootstrapped(St); + false -> + run_bootstrap(St) + end. + +%% Bootstrapping. +%% Responsible for transferring local routing table snapshot to the target +%% cluster. Does so either during the initial startup or when MQTT connection +%% is re-established with a clean session. Once bootstrapping is done, it +%% opens the syncer. + +run_bootstrap(St = #st{target = TargetCluster}) -> + Bootstrap = emqx_cluster_link_router_bootstrap:init(TargetCluster, #{}), + run_bootstrap(Bootstrap, St). + +run_bootstrap(Bootstrap, St) -> + case emqx_cluster_link_router_bootstrap:next_batch(Bootstrap) of + done -> + process_bootstrapped(St); + {Batch, NBootstrap} -> + %% TODO: Better error handling. + case process_bootstrap_batch(Batch, St) of + #{} -> + run_bootstrap(NBootstrap, St); + {error, {client, _, _}} -> + %% Client has exited, let `reconnect` codepath handle it. + St + end + end. + +process_bootstrapped(St = #st{target = TargetCluster}) -> + ok = open_syncer(TargetCluster), + St#st{bootstrapped = true}. + +process_bootstrap_batch(Batch, #st{client = ClientPid, actor = Actor, incarnation = Incarnation}) -> + publish_routes(ClientPid, Actor, Incarnation, Batch). diff --git a/apps/emqx_utils/src/emqx_utils.erl b/apps/emqx_utils/src/emqx_utils.erl index 644ed7ae8..8f41a4919 100644 --- a/apps/emqx_utils/src/emqx_utils.erl +++ b/apps/emqx_utils/src/emqx_utils.erl @@ -65,6 +65,7 @@ flattermap/2, tcp_keepalive_opts/4, format/1, + format/2, format_mfal/2, call_first_defined/1, ntoa/1, @@ -566,6 +567,9 @@ tcp_keepalive_opts(OS, _Idle, _Interval, _Probes) -> format(Term) -> iolist_to_binary(io_lib:format("~0p", [Term])). +format(Fmt, Args) -> + iolist_to_binary(io_lib:format(Fmt, Args)). + %% @doc Helper function for log formatters. -spec format_mfal(map(), map()) -> undefined | binary(). format_mfal(Data, #{with_mfa := true}) ->