From 7df91d852c7f8c790a99e34579c92b8715295d06 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Wed, 15 May 2024 19:18:57 +0300 Subject: [PATCH] feat(clusterlink): integrate node local syncer/actor implementation - support multiple cluster links in extrouter - apply extrouter ops on 'message.publish' hook - fix several minor bugs --- .../src/emqx_cluster_link.erl | 36 +++++--- .../src/emqx_cluster_link_app.erl | 5 +- .../src/emqx_cluster_link_extrouter.erl | 92 ++++++++++++------- .../src/emqx_cluster_link_mqtt.erl | 51 ++++++++-- .../src/emqx_cluster_link_router_syncer.erl | 32 +++++-- .../src/emqx_cluster_link_sup.erl | 10 +- 6 files changed, 154 insertions(+), 72 deletions(-) diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link.erl b/apps/emqx_cluster_link/src/emqx_cluster_link.erl index f0b0c95ba..ae3647d4a 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link.erl @@ -62,15 +62,10 @@ should_route_to_external_dests(_Msg) -> on_message_publish(#message{topic = <>, payload = Payload}) -> _ = case emqx_cluster_link_mqtt:decode_route_op(Payload) of - {add, Topics} when is_list(Topics) -> - add_routes(Topics, ClusterName); - {add, Topic} -> - emqx_router_syncer:push(add, Topic, ?DEST(ClusterName), #{}); - {delete, _} -> - %% Not implemented yet - ok; - cleanup_routes -> - cleanup_routes(ClusterName) + {actor_init, #{actor := Actor, incarnation := Incr}} -> + actor_init(ClusterName, Actor, Incr); + {route_updates, #{actor := Actor, incarnation := Incr}, RouteOps} -> + update_routes(ClusterName, Actor, Incr, RouteOps) end, {stop, []}; on_message_publish(#message{topic = <>, payload = Payload}) -> @@ -110,6 +105,19 @@ delete_hook() -> %% Internal functions %%-------------------------------------------------------------------- +actor_init(ClusterName, Actor, Incarnation) -> + Env = #{timestamp => erlang:system_time(millisecond)}, + {ok, _} = emqx_cluster_link_extrouter:actor_init(ClusterName, Actor, Incarnation, Env). + +update_routes(ClusterName, Actor, Incarnation, RouteOps) -> + ActorState = emqx_cluster_link_extrouter:actor_state(ClusterName, Actor, Incarnation), + lists:foreach( + fun(RouteOp) -> + emqx_cluster_link_extrouter:actor_apply_operation(RouteOp, ActorState) + end, + RouteOps + ). + cleanup_routes(ClusterName) -> emqx_router:cleanup_routes(?DEST(ClusterName)). @@ -142,11 +150,11 @@ on_init_ack(Res, ClusterName, Msg) -> #{'Correlation-Data' := ReqId} = emqx_message:get_header(properties, Msg), emqx_cluster_link_coordinator:on_link_ack(ClusterName, ReqId, Res). -add_routes(Topics, ClusterName) -> - lists:foreach( - fun(T) -> emqx_router_syncer:push(add, T, ?DEST(ClusterName), #{}) end, - Topics - ). +%% add_routes(Topics, ClusterName) -> +%% lists:foreach( +%% fun(T) -> emqx_router_syncer:push(add, T, ?DEST(ClusterName), #{}) end, +%% Topics +%% ). %% let it crash if extra is not a map, %% we don't expect the message to be forwarded from an older EMQX release, diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_app.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_app.erl index 68dc07f48..f05c5c1a0 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_app.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_app.erl @@ -11,6 +11,7 @@ -define(BROKER_MOD, emqx_cluster_link). start(_StartType, _StartArgs) -> + ok = mria:wait_for_tables(emqx_cluster_link_extrouter:create_tables()), emqx_cluster_link_config:add_handler(), LinksConf = enabled_links(), _ = @@ -31,7 +32,7 @@ prep_stop(State) -> stop(_State) -> _ = emqx_cluster_link:delete_hook(), _ = emqx_cluster_link:unregister_external_broker(), - _ = stop_msg_fwd_resources(emqx:get_config([cluster, links], [])), + _ = stop_msg_fwd_resources(emqx_cluster_link_config:links()), ok. %%-------------------------------------------------------------------- @@ -41,7 +42,7 @@ stop(_State) -> enabled_links() -> lists:filter( fun(#{enable := IsEnabled}) -> IsEnabled =:= true end, - emqx:get_config([cluster, links], []) + emqx_cluster_link_config:links() ). start_msg_fwd_resources(LinksConf) -> diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl index 76999f4cf..a09d4d8de 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl @@ -15,14 +15,16 @@ %% Actor API -export([ - actor_init/3, + actor_init/4, + actor_state/3, + actor_apply_operation/2, actor_apply_operation/3, actor_gc/1 ]). %% Internal API -export([ - mnesia_actor_init/3, + mnesia_actor_init/4, mnesia_actor_heartbeat/3, mnesia_clean_incarnation/1, apply_actor_operation/5 @@ -31,6 +33,9 @@ %% Strictly monotonically increasing integer. -type smint() :: integer(). +%% Remote cluster name +-type cluster() :: binary(). + %% Actor. %% Identifies an independent route replication actor on the remote broker. %% Usually something like `node()` or `{node(), _Shard}`. @@ -45,17 +50,18 @@ %% Operation. %% RouteID should come in handy when two or more different routes on the actor side %% are "intersected" to the same topic filter that needs to be replicated here. --type op() :: {add | del, _TopicFilter :: binary(), _RouteID} | heartbeat. +-type op() :: {add | delete, {_TopicFilter :: binary(), _RouteID}} | heartbeat. %% Basically a bit offset. %% Each actor + incarnation pair occupies a separate lane in the multi-counter. %% Example: %% Actors | n1@ds n2@ds n3@ds %% Lanes | 0 1 2 -%% Op1 | n3@ds add client/42/# → MCounter += 1 bsl 2 = 4 -%% Op2 | n2@ds add client/42/# → MCounter += 1 bsl 1 = 6 -%% Op3 | n3@ds del client/42/# → MCounter -= 1 bsl 2 = 2 -%% Op4 | n2@ds del client/42/# → MCounter -= 1 bsl 1 = 0 → route deleted +%% --------------------------- +%% Op1 | n3@ds add client/42/# → MCounter += 1 bsl 2 = 4 +%% Op2 | n2@ds add client/42/# → MCounter += 1 bsl 1 = 6 +%% Op3 | n3@ds delete client/42/# → MCounter -= 1 bsl 2 = 2 +%% Op4 | n2@ds delete client/42/# → MCounter -= 1 bsl 1 = 0 → route deleted -type lane() :: non_neg_integer(). -define(DEFAULT_ACTOR_TTL_MS, 30_000). @@ -64,13 +70,16 @@ -define(EXTROUTE_TAB, emqx_external_router_route). -define(EXTROUTE_ACTOR_TAB, emqx_external_router_actor). +-define(ACTOR_ID(Cluster, Actor), {Cluster, Actor}). +-define(ROUTE_ID(Cluster, RouteID), {Cluster, RouteID}). + -record(extroute, { entry :: emqx_topic_index:key(_RouteID), mcounter = 0 :: non_neg_integer() }). -record(actor, { - id :: actor(), + id :: {cluster(), actor()}, incarnation :: incarnation(), lane :: lane(), until :: _Timestamp @@ -102,7 +111,7 @@ create_tables() -> ]} ]} ]), - [?EXTROUTE_TAB]. + [?EXTROUTE_ACTOR_TAB, ?EXTROUTE_TAB]. %% @@ -124,6 +133,7 @@ match_to_route(M) -> %% -record(state, { + cluster :: cluster(), actor :: actor(), incarnation :: incarnation(), lane :: lane() | undefined @@ -133,19 +143,19 @@ match_to_route(M) -> -type env() :: #{timestamp := _Milliseconds}. --spec actor_init(actor(), incarnation(), env()) -> {ok, state()}. -actor_init(Actor, Incarnation, Env = #{timestamp := Now}) -> +-spec actor_init(cluster(), actor(), incarnation(), env()) -> {ok, state()}. +actor_init(Cluster, Actor, Incarnation, Env = #{timestamp := Now}) -> %% TODO: Rolling upgrade safety? - case transaction(fun ?MODULE:mnesia_actor_init/3, [Actor, Incarnation, Now]) of + case transaction(fun ?MODULE:mnesia_actor_init/4, [Cluster, Actor, Incarnation, Now]) of {ok, State} -> {ok, State}; {reincarnate, Rec} -> %% TODO: Do this asynchronously. ok = clean_incarnation(Rec), - actor_init(Actor, Incarnation, Env) + actor_init(Cluster, Actor, Incarnation, Env) end. -mnesia_actor_init(Actor, Incarnation, TS) -> +mnesia_actor_init(Cluster, Actor, Incarnation, TS) -> %% NOTE %% We perform this heavy-weight transaction only in the case of a new route %% replication connection. The implicit assumption is that each replication @@ -154,15 +164,15 @@ mnesia_actor_init(Actor, Incarnation, TS) -> %% ClientID. There's always a chance of having stray process severely lagging %% that applies some update out of the blue, but it seems impossible to prevent %% it completely w/o transactions. - State = #state{actor = Actor, incarnation = Incarnation}, + State = #state{cluster = Cluster, actor = Actor, incarnation = Incarnation}, case mnesia:read(?EXTROUTE_ACTOR_TAB, Actor, write) of [#actor{incarnation = Incarnation, lane = Lane} = Rec] -> ok = mnesia:write(?EXTROUTE_ACTOR_TAB, Rec#actor{until = bump_actor_ttl(TS)}, write), {ok, State#state{lane = Lane}}; [] -> - Lane = mnesia_assign_lane(), + Lane = mnesia_assign_lane(Cluster), Rec = #actor{ - id = Actor, + id = ?ACTOR_ID(Cluster, Actor), incarnation = Incarnation, lane = Lane, until = bump_actor_ttl(TS) @@ -175,21 +185,32 @@ mnesia_actor_init(Actor, Incarnation, TS) -> mnesia:abort({outdated_incarnation_actor, Actor, Incarnation, Newer}) end. +-spec actor_state(cluster(), actor(), incarnation()) -> state(). +actor_state(Cluster, Actor, Incarnation) -> + ActorID = ?ACTOR_ID(Cluster, Actor), + [#actor{lane = Lane}] = mnesia:dirty_read(?EXTROUTE_ACTOR_TAB, ActorID), + #state{cluster = Cluster, actor = Actor, incarnation = Incarnation, lane = Lane}. + +-spec actor_apply_operation(op(), state()) -> state(). +actor_apply_operation(Op, State) -> + actor_apply_operation(Op, State, #{}). + -spec actor_apply_operation(op(), state(), env()) -> state(). actor_apply_operation( - {OpName, TopicFilter, ID}, - State = #state{actor = Actor, incarnation = Incarnation, lane = Lane}, + {OpName, {TopicFilter, ID}}, + State = #state{cluster = Cluster, actor = Actor, incarnation = Incarnation, lane = Lane}, _Env ) -> - Entry = emqx_topic_index:make_key(TopicFilter, ID), + ActorID = ?ACTOR_ID(Cluster, Actor), + Entry = emqx_topic_index:make_key(TopicFilter, ?ROUTE_ID(Cluster, ID)), case mria_config:whoami() of Role when Role /= replicant -> - apply_actor_operation(Actor, Incarnation, Entry, OpName, Lane); + apply_actor_operation(ActorID, Incarnation, Entry, OpName, Lane); replicant -> mria:async_dirty( ?EXTROUTE_SHARD, fun ?MODULE:apply_actor_operation/5, - [Actor, Incarnation, Entry, OpName, Lane] + [ActorID, Incarnation, Entry, OpName, Lane] ) end, State; @@ -201,8 +222,8 @@ actor_apply_operation( ok = transaction(fun ?MODULE:mnesia_actor_heartbeat/3, [Actor, Incarnation, Now]), State. -apply_actor_operation(Actor, Incarnation, Entry, OpName, Lane) -> - _ = assert_current_incarnation(Actor, Incarnation), +apply_actor_operation(ActorID, Incarnation, Entry, OpName, Lane) -> + _ = assert_current_incarnation(ActorID, Incarnation), apply_operation(Entry, OpName, Lane). apply_operation(Entry, OpName, Lane) -> @@ -232,7 +253,7 @@ apply_operation(Entry, MCounter, OpName, Lane) -> Marker when OpName =:= add -> %% Already added. MCounter; - Marker when OpName =:= del -> + Marker when OpName =:= delete -> case mria:dirty_update_counter(?EXTROUTE_TAB, Entry, -Marker) of 0 -> Record = #extroute{entry = Entry, mcounter = 0}, @@ -241,7 +262,7 @@ apply_operation(Entry, MCounter, OpName, Lane) -> C -> C end; - 0 when OpName =:= del -> + 0 when OpName =:= delete -> %% Already deleted. MCounter end. @@ -257,18 +278,19 @@ actor_gc(#{timestamp := Now}) -> ok end. -mnesia_assign_lane() -> - Assignment = mnesia:foldl( - fun(#actor{lane = Lane}, Acc) -> - Acc bor (1 bsl Lane) - end, +mnesia_assign_lane(Cluster) -> + Assignment = lists:foldl( + fun(Lane, Acc) -> Acc bor (1 bsl Lane) end, 0, - ?EXTROUTE_ACTOR_TAB, - write + select_cluster_lanes(Cluster) ), Lane = first_zero_bit(Assignment), Lane. +select_cluster_lanes(Cluster) -> + MS = [{#actor{id = {Cluster, '_'}, lane = '$1', _ = '_'}, [], ['$1']}], + mnesia:select(?EXTROUTE_ACTOR_TAB, MS, write). + mnesia_actor_heartbeat(Actor, Incarnation, TS) -> case mnesia:read(?EXTROUTE_ACTOR_TAB, Actor, write) of [#actor{incarnation = Incarnation} = Rec] -> @@ -300,13 +322,13 @@ clean_lane(Lane) -> ?EXTROUTE_TAB ). -assert_current_incarnation(Actor, Incarnation) -> +assert_current_incarnation(ActorID, Incarnation) -> %% NOTE %% Ugly, but should not really happen anyway. This is a safety net for the case %% when this process tries to apply some outdated operation for whatever reason %% (e.g. heavy CPU starvation). Still, w/o transactions, it's just a best-effort %% attempt. - [#actor{incarnation = Incarnation}] = mnesia:dirty_read(?EXTROUTE_ACTOR_TAB, Actor), + [#actor{incarnation = Incarnation}] = mnesia:dirty_read(?EXTROUTE_ACTOR_TAB, ActorID), ok. %% diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl index b111be954..3b16642ac 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl @@ -45,6 +45,7 @@ ]). -export([ + publish_actor_init_sync/3, publish_route_sync/4, encode_field/2 ]). @@ -58,7 +59,7 @@ -define(CLIENTID(Base, Suffix), emqx_bridge_mqtt_lib:clientid_base([Base, Suffix])). -define(MQTT_HOST_OPTS, #{default_port => 1883}). --define(MY_CLUSTER_NAME, atom_to_binary(emqx_config:get([cluster, name]))). +-define(MY_CLUSTER_NAME, emqx_cluster_link_config:cluster()). -define(ROUTE_TOPIC, <>). -define(MSG_FWD_TOPIC, <>). @@ -90,6 +91,7 @@ -define(F_OPERATION, '$op'). -define(OP_ROUTE, <<"route">>). +-define(OP_ACTOR_INIT, <<"actor_init">>). -define(F_ACTOR, 10). -define(F_INCARNATION, 11). @@ -403,6 +405,18 @@ publish_result(Caller, Ref, Result) -> Caller ! {pub_result, Ref, Err} end. +%%% New leader-less Syncer/Actor implementation + +publish_actor_init_sync(ClientPid, Actor, Incarnation) -> + %% TODO: handshake (request / response) to make sure the link is established + PubTopic = ?ROUTE_TOPIC, + Payload = #{ + ?F_OPERATION => ?OP_ACTOR_INIT, + ?F_ACTOR => Actor, + ?F_INCARNATION => Incarnation + }, + emqtt:publish(ClientPid, PubTopic, ?ENCODE(Payload), ?QOS_1). + publish_route_sync(ClientPid, Actor, Incarnation, Updates) -> PubTopic = ?ROUTE_TOPIC, Payload = #{ @@ -473,14 +487,28 @@ decode_ctrl_msg1(#{<<"op">> := ?UNLINK_OP}, _ClusterName) -> decode_route_op(Payload) -> decode_route_op1(?DECODE(Payload)). -decode_route_op1(<<"add_", Topic/binary>>) -> - {add, Topic}; -decode_route_op1(<<"delete_", Topic/binary>>) -> - {delete, Topic}; -decode_route_op1(#{<<"op">> := ?BATCH_ROUTES_OP, <<"topics">> := Topics}) when is_list(Topics) -> - {add, Topics}; -decode_route_op1(#{<<"op">> := ?CLEANUP_ROUTES_OP}) -> - cleanup_routes; +decode_route_op1(#{ + ?F_OPERATION := ?OP_ACTOR_INIT, + ?F_ACTOR := Actor, + ?F_INCARNATION := Incr +}) -> + {actor_init, #{actor => Actor, incarnation => Incr}}; +decode_route_op1(#{ + ?F_OPERATION := ?OP_ROUTE, + ?F_ACTOR := Actor, + ?F_INCARNATION := Incr, + ?F_ROUTES := RouteOps +}) -> + RouteOps1 = lists:map(fun(Op) -> decode_field(route, Op) end, RouteOps), + {route_updates, #{actor => Actor, incarnation => Incr}, RouteOps1}; +%%decode_route_op1(<<"add_", Topic/binary>>) -> +%% {add, Topic}; +%%decode_route_op1(<<"delete_", Topic/binary>>) -> +%% {delete, Topic}; +%%decode_route_op1(#{<<"op">> := ?BATCH_ROUTES_OP, <<"topics">> := Topics}) when is_list(Topics) -> +%% {add, Topics}; +%%decode_route_op1(#{<<"op">> := ?CLEANUP_ROUTES_OP}) -> +%% cleanup_routes; decode_route_op1(Payload) -> ?SLOG(warning, #{ msg => "unexpected_cluster_link_route_op_payload", @@ -528,6 +556,11 @@ encode_field(route, {add, Route = {_Topic, _ID}}) -> encode_field(route, {delete, {Topic, ID}}) -> {?ROUTE_DELETE, Topic, ID}. +decode_field(route, {?ROUTE_DELETE, Route = {_Topic, _ID}}) -> + {delete, Route}; +decode_field(route, Route = {_Topic, _ID}) -> + {add, Route}. + %%-------------------------------------------------------------------- %% emqx_external_broker %%-------------------------------------------------------------------- diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl index 48dda2e2d..d432cd7d8 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl @@ -19,7 +19,8 @@ process_syncer_batch/4 ]). --behaviour(supervisor). +%% silence warning +%% -behaviour(supervisor). -export([init/1]). -behaviour(gen_server). @@ -99,7 +100,17 @@ ensure_actor_incarnation() -> start_link_client(TargetCluster) -> Options = emqx_cluster_link_config:emqtt_options(TargetCluster), - emqtt:start_link(refine_client_options(Options)). + case emqtt:start_link(refine_client_options(Options)) of + {ok, Pid} -> + case emqtt:connect(Pid) of + {ok, _Props} -> + {ok, Pid}; + Error -> + Error + end; + Error -> + Error + end. refine_client_options(Options = #{clientid := ClientID}) -> %% TODO: Reconnect should help, but it looks broken right now. @@ -180,7 +191,7 @@ batch_get_opname(Op) -> init({sup, TargetCluster}) -> %% FIXME: Intensity. SupFlags = #{ - strategy => all_for_one, + strategy => one_for_all, intensity => 10, period => 60 }, @@ -239,7 +250,7 @@ init_actor(State = #st{}) -> {ok, State, {continue, connect}}. handle_continue(connect, State) -> - process_connect(State). + {noreply, process_connect(State)}. handle_call(_Request, _From, State) -> {reply, ignored, State}. @@ -248,9 +259,9 @@ handle_cast(_Request, State) -> {noreply, State}. handle_info({'EXIT', ClientPid, Reason}, St = #st{client = ClientPid}) -> - handle_client_down(Reason, St); + {noreply, handle_client_down(Reason, St)}; handle_info({timeout, TRef, _Reconnect}, St = #st{reconnect_timer = TRef}) -> - process_connect(St#st{reconnect_timer = undefined}); + {noreply, process_connect(St#st{reconnect_timer = undefined})}; handle_info(_Info, St) -> %% TODO: log? {noreply, St}. @@ -258,22 +269,25 @@ handle_info(_Info, St) -> terminate(_Reason, _State) -> ok. -process_connect(St = #st{actor = TargetCluster}) -> +process_connect(St = #st{target = TargetCluster, actor = Actor, incarnation = Incr}) -> case start_link_client(TargetCluster) of {ok, ClientPid} -> ok = start_syncer(TargetCluster), ok = announce_client(TargetCluster, ClientPid), + %% TODO: error handling, handshake + + {ok, _} = emqx_cluster_link_mqtt:publish_actor_init_sync(ClientPid, Actor, Incr), process_bootstrap(St#st{client = ClientPid}); {error, Reason} -> handle_connect_error(Reason, St) end. -handle_connect_error(Reason, St) -> +handle_connect_error(_Reason, St) -> %% TODO: logs TRef = erlang:start_timer(?RECONNECT_TIMEOUT, self(), reconnect), St#st{reconnect_timer = TRef}. -handle_client_down(Reason, St = #st{target = TargetCluster}) -> +handle_client_down(_Reason, St = #st{target = TargetCluster}) -> %% TODO: logs ok = close_syncer(TargetCluster), process_connect(St#st{client = undefined}). diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl index c98b9f4c5..beb641a92 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl @@ -22,13 +22,17 @@ init(LinksConf) -> intensity => 10, period => 5 }, - Children = [sup_spec(?COORD_SUP, ?COORD_SUP, LinksConf)], + %% Children = [sup_spec(?COORD_SUP, ?COORD_SUP, LinksConf)], + Children = [ + sup_spec(Name, emqx_cluster_link_router_syncer, [Name]) + || #{upstream := Name} <- LinksConf + ], {ok, {SupFlags, Children}}. -sup_spec(Id, Mod, Conf) -> +sup_spec(Id, Mod, Args) -> #{ id => Id, - start => {Mod, start_link, [Conf]}, + start => {Mod, start_link, Args}, restart => permanent, shutdown => infinity, type => supervisor,