feat(clusterlink): integrate node local syncer/actor implementation

- support multiple cluster links in extrouter
 - apply extrouter ops on 'message.publish' hook
 - fix several minor bugs
This commit is contained in:
Serge Tupchii 2024-05-15 19:18:57 +03:00
parent cbd01ae818
commit 7df91d852c
6 changed files with 154 additions and 72 deletions

View File

@ -62,15 +62,10 @@ should_route_to_external_dests(_Msg) ->
on_message_publish(#message{topic = <<?ROUTE_TOPIC_PREFIX, ClusterName/binary>>, payload = Payload}) ->
_ =
case emqx_cluster_link_mqtt:decode_route_op(Payload) of
{add, Topics} when is_list(Topics) ->
add_routes(Topics, ClusterName);
{add, Topic} ->
emqx_router_syncer:push(add, Topic, ?DEST(ClusterName), #{});
{delete, _} ->
%% Not implemented yet
ok;
cleanup_routes ->
cleanup_routes(ClusterName)
{actor_init, #{actor := Actor, incarnation := Incr}} ->
actor_init(ClusterName, Actor, Incr);
{route_updates, #{actor := Actor, incarnation := Incr}, RouteOps} ->
update_routes(ClusterName, Actor, Incr, RouteOps)
end,
{stop, []};
on_message_publish(#message{topic = <<?MSG_TOPIC_PREFIX, ClusterName/binary>>, payload = Payload}) ->
@ -110,6 +105,19 @@ delete_hook() ->
%% Internal functions
%%--------------------------------------------------------------------
actor_init(ClusterName, Actor, Incarnation) ->
Env = #{timestamp => erlang:system_time(millisecond)},
{ok, _} = emqx_cluster_link_extrouter:actor_init(ClusterName, Actor, Incarnation, Env).
update_routes(ClusterName, Actor, Incarnation, RouteOps) ->
ActorState = emqx_cluster_link_extrouter:actor_state(ClusterName, Actor, Incarnation),
lists:foreach(
fun(RouteOp) ->
emqx_cluster_link_extrouter:actor_apply_operation(RouteOp, ActorState)
end,
RouteOps
).
cleanup_routes(ClusterName) ->
emqx_router:cleanup_routes(?DEST(ClusterName)).
@ -142,11 +150,11 @@ on_init_ack(Res, ClusterName, Msg) ->
#{'Correlation-Data' := ReqId} = emqx_message:get_header(properties, Msg),
emqx_cluster_link_coordinator:on_link_ack(ClusterName, ReqId, Res).
add_routes(Topics, ClusterName) ->
lists:foreach(
fun(T) -> emqx_router_syncer:push(add, T, ?DEST(ClusterName), #{}) end,
Topics
).
%% add_routes(Topics, ClusterName) ->
%% lists:foreach(
%% fun(T) -> emqx_router_syncer:push(add, T, ?DEST(ClusterName), #{}) end,
%% Topics
%% ).
%% let it crash if extra is not a map,
%% we don't expect the message to be forwarded from an older EMQX release,

View File

@ -11,6 +11,7 @@
-define(BROKER_MOD, emqx_cluster_link).
start(_StartType, _StartArgs) ->
ok = mria:wait_for_tables(emqx_cluster_link_extrouter:create_tables()),
emqx_cluster_link_config:add_handler(),
LinksConf = enabled_links(),
_ =
@ -31,7 +32,7 @@ prep_stop(State) ->
stop(_State) ->
_ = emqx_cluster_link:delete_hook(),
_ = emqx_cluster_link:unregister_external_broker(),
_ = stop_msg_fwd_resources(emqx:get_config([cluster, links], [])),
_ = stop_msg_fwd_resources(emqx_cluster_link_config:links()),
ok.
%%--------------------------------------------------------------------
@ -41,7 +42,7 @@ stop(_State) ->
enabled_links() ->
lists:filter(
fun(#{enable := IsEnabled}) -> IsEnabled =:= true end,
emqx:get_config([cluster, links], [])
emqx_cluster_link_config:links()
).
start_msg_fwd_resources(LinksConf) ->

View File

@ -15,14 +15,16 @@
%% Actor API
-export([
actor_init/3,
actor_init/4,
actor_state/3,
actor_apply_operation/2,
actor_apply_operation/3,
actor_gc/1
]).
%% Internal API
-export([
mnesia_actor_init/3,
mnesia_actor_init/4,
mnesia_actor_heartbeat/3,
mnesia_clean_incarnation/1,
apply_actor_operation/5
@ -31,6 +33,9 @@
%% Strictly monotonically increasing integer.
-type smint() :: integer().
%% Remote cluster name
-type cluster() :: binary().
%% Actor.
%% Identifies an independent route replication actor on the remote broker.
%% Usually something like `node()` or `{node(), _Shard}`.
@ -45,17 +50,18 @@
%% Operation.
%% RouteID should come in handy when two or more different routes on the actor side
%% are "intersected" to the same topic filter that needs to be replicated here.
-type op() :: {add | del, _TopicFilter :: binary(), _RouteID} | heartbeat.
-type op() :: {add | delete, {_TopicFilter :: binary(), _RouteID}} | heartbeat.
%% Basically a bit offset.
%% Each actor + incarnation pair occupies a separate lane in the multi-counter.
%% Example:
%% Actors | n1@ds n2@ds n3@ds
%% Lanes | 0 1 2
%% ---------------------------
%% Op1 | n3@ds add client/42/# MCounter += 1 bsl 2 = 4
%% Op2 | n2@ds add client/42/# MCounter += 1 bsl 1 = 6
%% Op3 | n3@ds del client/42/# MCounter -= 1 bsl 2 = 2
%% Op4 | n2@ds del client/42/# MCounter -= 1 bsl 1 = 0 route deleted
%% Op3 | n3@ds delete client/42/# MCounter -= 1 bsl 2 = 2
%% Op4 | n2@ds delete client/42/# MCounter -= 1 bsl 1 = 0 route deleted
-type lane() :: non_neg_integer().
-define(DEFAULT_ACTOR_TTL_MS, 30_000).
@ -64,13 +70,16 @@
-define(EXTROUTE_TAB, emqx_external_router_route).
-define(EXTROUTE_ACTOR_TAB, emqx_external_router_actor).
-define(ACTOR_ID(Cluster, Actor), {Cluster, Actor}).
-define(ROUTE_ID(Cluster, RouteID), {Cluster, RouteID}).
-record(extroute, {
entry :: emqx_topic_index:key(_RouteID),
mcounter = 0 :: non_neg_integer()
}).
-record(actor, {
id :: actor(),
id :: {cluster(), actor()},
incarnation :: incarnation(),
lane :: lane(),
until :: _Timestamp
@ -102,7 +111,7 @@ create_tables() ->
]}
]}
]),
[?EXTROUTE_TAB].
[?EXTROUTE_ACTOR_TAB, ?EXTROUTE_TAB].
%%
@ -124,6 +133,7 @@ match_to_route(M) ->
%%
-record(state, {
cluster :: cluster(),
actor :: actor(),
incarnation :: incarnation(),
lane :: lane() | undefined
@ -133,19 +143,19 @@ match_to_route(M) ->
-type env() :: #{timestamp := _Milliseconds}.
-spec actor_init(actor(), incarnation(), env()) -> {ok, state()}.
actor_init(Actor, Incarnation, Env = #{timestamp := Now}) ->
-spec actor_init(cluster(), actor(), incarnation(), env()) -> {ok, state()}.
actor_init(Cluster, Actor, Incarnation, Env = #{timestamp := Now}) ->
%% TODO: Rolling upgrade safety?
case transaction(fun ?MODULE:mnesia_actor_init/3, [Actor, Incarnation, Now]) of
case transaction(fun ?MODULE:mnesia_actor_init/4, [Cluster, Actor, Incarnation, Now]) of
{ok, State} ->
{ok, State};
{reincarnate, Rec} ->
%% TODO: Do this asynchronously.
ok = clean_incarnation(Rec),
actor_init(Actor, Incarnation, Env)
actor_init(Cluster, Actor, Incarnation, Env)
end.
mnesia_actor_init(Actor, Incarnation, TS) ->
mnesia_actor_init(Cluster, Actor, Incarnation, TS) ->
%% NOTE
%% We perform this heavy-weight transaction only in the case of a new route
%% replication connection. The implicit assumption is that each replication
@ -154,15 +164,15 @@ mnesia_actor_init(Actor, Incarnation, TS) ->
%% ClientID. There's always a chance of having stray process severely lagging
%% that applies some update out of the blue, but it seems impossible to prevent
%% it completely w/o transactions.
State = #state{actor = Actor, incarnation = Incarnation},
State = #state{cluster = Cluster, actor = Actor, incarnation = Incarnation},
case mnesia:read(?EXTROUTE_ACTOR_TAB, Actor, write) of
[#actor{incarnation = Incarnation, lane = Lane} = Rec] ->
ok = mnesia:write(?EXTROUTE_ACTOR_TAB, Rec#actor{until = bump_actor_ttl(TS)}, write),
{ok, State#state{lane = Lane}};
[] ->
Lane = mnesia_assign_lane(),
Lane = mnesia_assign_lane(Cluster),
Rec = #actor{
id = Actor,
id = ?ACTOR_ID(Cluster, Actor),
incarnation = Incarnation,
lane = Lane,
until = bump_actor_ttl(TS)
@ -175,21 +185,32 @@ mnesia_actor_init(Actor, Incarnation, TS) ->
mnesia:abort({outdated_incarnation_actor, Actor, Incarnation, Newer})
end.
-spec actor_state(cluster(), actor(), incarnation()) -> state().
actor_state(Cluster, Actor, Incarnation) ->
ActorID = ?ACTOR_ID(Cluster, Actor),
[#actor{lane = Lane}] = mnesia:dirty_read(?EXTROUTE_ACTOR_TAB, ActorID),
#state{cluster = Cluster, actor = Actor, incarnation = Incarnation, lane = Lane}.
-spec actor_apply_operation(op(), state()) -> state().
actor_apply_operation(Op, State) ->
actor_apply_operation(Op, State, #{}).
-spec actor_apply_operation(op(), state(), env()) -> state().
actor_apply_operation(
{OpName, TopicFilter, ID},
State = #state{actor = Actor, incarnation = Incarnation, lane = Lane},
{OpName, {TopicFilter, ID}},
State = #state{cluster = Cluster, actor = Actor, incarnation = Incarnation, lane = Lane},
_Env
) ->
Entry = emqx_topic_index:make_key(TopicFilter, ID),
ActorID = ?ACTOR_ID(Cluster, Actor),
Entry = emqx_topic_index:make_key(TopicFilter, ?ROUTE_ID(Cluster, ID)),
case mria_config:whoami() of
Role when Role /= replicant ->
apply_actor_operation(Actor, Incarnation, Entry, OpName, Lane);
apply_actor_operation(ActorID, Incarnation, Entry, OpName, Lane);
replicant ->
mria:async_dirty(
?EXTROUTE_SHARD,
fun ?MODULE:apply_actor_operation/5,
[Actor, Incarnation, Entry, OpName, Lane]
[ActorID, Incarnation, Entry, OpName, Lane]
)
end,
State;
@ -201,8 +222,8 @@ actor_apply_operation(
ok = transaction(fun ?MODULE:mnesia_actor_heartbeat/3, [Actor, Incarnation, Now]),
State.
apply_actor_operation(Actor, Incarnation, Entry, OpName, Lane) ->
_ = assert_current_incarnation(Actor, Incarnation),
apply_actor_operation(ActorID, Incarnation, Entry, OpName, Lane) ->
_ = assert_current_incarnation(ActorID, Incarnation),
apply_operation(Entry, OpName, Lane).
apply_operation(Entry, OpName, Lane) ->
@ -232,7 +253,7 @@ apply_operation(Entry, MCounter, OpName, Lane) ->
Marker when OpName =:= add ->
%% Already added.
MCounter;
Marker when OpName =:= del ->
Marker when OpName =:= delete ->
case mria:dirty_update_counter(?EXTROUTE_TAB, Entry, -Marker) of
0 ->
Record = #extroute{entry = Entry, mcounter = 0},
@ -241,7 +262,7 @@ apply_operation(Entry, MCounter, OpName, Lane) ->
C ->
C
end;
0 when OpName =:= del ->
0 when OpName =:= delete ->
%% Already deleted.
MCounter
end.
@ -257,18 +278,19 @@ actor_gc(#{timestamp := Now}) ->
ok
end.
mnesia_assign_lane() ->
Assignment = mnesia:foldl(
fun(#actor{lane = Lane}, Acc) ->
Acc bor (1 bsl Lane)
end,
mnesia_assign_lane(Cluster) ->
Assignment = lists:foldl(
fun(Lane, Acc) -> Acc bor (1 bsl Lane) end,
0,
?EXTROUTE_ACTOR_TAB,
write
select_cluster_lanes(Cluster)
),
Lane = first_zero_bit(Assignment),
Lane.
select_cluster_lanes(Cluster) ->
MS = [{#actor{id = {Cluster, '_'}, lane = '$1', _ = '_'}, [], ['$1']}],
mnesia:select(?EXTROUTE_ACTOR_TAB, MS, write).
mnesia_actor_heartbeat(Actor, Incarnation, TS) ->
case mnesia:read(?EXTROUTE_ACTOR_TAB, Actor, write) of
[#actor{incarnation = Incarnation} = Rec] ->
@ -300,13 +322,13 @@ clean_lane(Lane) ->
?EXTROUTE_TAB
).
assert_current_incarnation(Actor, Incarnation) ->
assert_current_incarnation(ActorID, Incarnation) ->
%% NOTE
%% Ugly, but should not really happen anyway. This is a safety net for the case
%% when this process tries to apply some outdated operation for whatever reason
%% (e.g. heavy CPU starvation). Still, w/o transactions, it's just a best-effort
%% attempt.
[#actor{incarnation = Incarnation}] = mnesia:dirty_read(?EXTROUTE_ACTOR_TAB, Actor),
[#actor{incarnation = Incarnation}] = mnesia:dirty_read(?EXTROUTE_ACTOR_TAB, ActorID),
ok.
%%

View File

@ -45,6 +45,7 @@
]).
-export([
publish_actor_init_sync/3,
publish_route_sync/4,
encode_field/2
]).
@ -58,7 +59,7 @@
-define(CLIENTID(Base, Suffix), emqx_bridge_mqtt_lib:clientid_base([Base, Suffix])).
-define(MQTT_HOST_OPTS, #{default_port => 1883}).
-define(MY_CLUSTER_NAME, atom_to_binary(emqx_config:get([cluster, name]))).
-define(MY_CLUSTER_NAME, emqx_cluster_link_config:cluster()).
-define(ROUTE_TOPIC, <<?ROUTE_TOPIC_PREFIX, (?MY_CLUSTER_NAME)/binary>>).
-define(MSG_FWD_TOPIC, <<?MSG_TOPIC_PREFIX, (?MY_CLUSTER_NAME)/binary>>).
@ -90,6 +91,7 @@
-define(F_OPERATION, '$op').
-define(OP_ROUTE, <<"route">>).
-define(OP_ACTOR_INIT, <<"actor_init">>).
-define(F_ACTOR, 10).
-define(F_INCARNATION, 11).
@ -403,6 +405,18 @@ publish_result(Caller, Ref, Result) ->
Caller ! {pub_result, Ref, Err}
end.
%%% New leader-less Syncer/Actor implementation
publish_actor_init_sync(ClientPid, Actor, Incarnation) ->
%% TODO: handshake (request / response) to make sure the link is established
PubTopic = ?ROUTE_TOPIC,
Payload = #{
?F_OPERATION => ?OP_ACTOR_INIT,
?F_ACTOR => Actor,
?F_INCARNATION => Incarnation
},
emqtt:publish(ClientPid, PubTopic, ?ENCODE(Payload), ?QOS_1).
publish_route_sync(ClientPid, Actor, Incarnation, Updates) ->
PubTopic = ?ROUTE_TOPIC,
Payload = #{
@ -473,14 +487,28 @@ decode_ctrl_msg1(#{<<"op">> := ?UNLINK_OP}, _ClusterName) ->
decode_route_op(Payload) ->
decode_route_op1(?DECODE(Payload)).
decode_route_op1(<<"add_", Topic/binary>>) ->
{add, Topic};
decode_route_op1(<<"delete_", Topic/binary>>) ->
{delete, Topic};
decode_route_op1(#{<<"op">> := ?BATCH_ROUTES_OP, <<"topics">> := Topics}) when is_list(Topics) ->
{add, Topics};
decode_route_op1(#{<<"op">> := ?CLEANUP_ROUTES_OP}) ->
cleanup_routes;
decode_route_op1(#{
?F_OPERATION := ?OP_ACTOR_INIT,
?F_ACTOR := Actor,
?F_INCARNATION := Incr
}) ->
{actor_init, #{actor => Actor, incarnation => Incr}};
decode_route_op1(#{
?F_OPERATION := ?OP_ROUTE,
?F_ACTOR := Actor,
?F_INCARNATION := Incr,
?F_ROUTES := RouteOps
}) ->
RouteOps1 = lists:map(fun(Op) -> decode_field(route, Op) end, RouteOps),
{route_updates, #{actor => Actor, incarnation => Incr}, RouteOps1};
%%decode_route_op1(<<"add_", Topic/binary>>) ->
%% {add, Topic};
%%decode_route_op1(<<"delete_", Topic/binary>>) ->
%% {delete, Topic};
%%decode_route_op1(#{<<"op">> := ?BATCH_ROUTES_OP, <<"topics">> := Topics}) when is_list(Topics) ->
%% {add, Topics};
%%decode_route_op1(#{<<"op">> := ?CLEANUP_ROUTES_OP}) ->
%% cleanup_routes;
decode_route_op1(Payload) ->
?SLOG(warning, #{
msg => "unexpected_cluster_link_route_op_payload",
@ -528,6 +556,11 @@ encode_field(route, {add, Route = {_Topic, _ID}}) ->
encode_field(route, {delete, {Topic, ID}}) ->
{?ROUTE_DELETE, Topic, ID}.
decode_field(route, {?ROUTE_DELETE, Route = {_Topic, _ID}}) ->
{delete, Route};
decode_field(route, Route = {_Topic, _ID}) ->
{add, Route}.
%%--------------------------------------------------------------------
%% emqx_external_broker
%%--------------------------------------------------------------------

View File

@ -19,7 +19,8 @@
process_syncer_batch/4
]).
-behaviour(supervisor).
%% silence warning
%% -behaviour(supervisor).
-export([init/1]).
-behaviour(gen_server).
@ -99,7 +100,17 @@ ensure_actor_incarnation() ->
start_link_client(TargetCluster) ->
Options = emqx_cluster_link_config:emqtt_options(TargetCluster),
emqtt:start_link(refine_client_options(Options)).
case emqtt:start_link(refine_client_options(Options)) of
{ok, Pid} ->
case emqtt:connect(Pid) of
{ok, _Props} ->
{ok, Pid};
Error ->
Error
end;
Error ->
Error
end.
refine_client_options(Options = #{clientid := ClientID}) ->
%% TODO: Reconnect should help, but it looks broken right now.
@ -180,7 +191,7 @@ batch_get_opname(Op) ->
init({sup, TargetCluster}) ->
%% FIXME: Intensity.
SupFlags = #{
strategy => all_for_one,
strategy => one_for_all,
intensity => 10,
period => 60
},
@ -239,7 +250,7 @@ init_actor(State = #st{}) ->
{ok, State, {continue, connect}}.
handle_continue(connect, State) ->
process_connect(State).
{noreply, process_connect(State)}.
handle_call(_Request, _From, State) ->
{reply, ignored, State}.
@ -248,9 +259,9 @@ handle_cast(_Request, State) ->
{noreply, State}.
handle_info({'EXIT', ClientPid, Reason}, St = #st{client = ClientPid}) ->
handle_client_down(Reason, St);
{noreply, handle_client_down(Reason, St)};
handle_info({timeout, TRef, _Reconnect}, St = #st{reconnect_timer = TRef}) ->
process_connect(St#st{reconnect_timer = undefined});
{noreply, process_connect(St#st{reconnect_timer = undefined})};
handle_info(_Info, St) ->
%% TODO: log?
{noreply, St}.
@ -258,22 +269,25 @@ handle_info(_Info, St) ->
terminate(_Reason, _State) ->
ok.
process_connect(St = #st{actor = TargetCluster}) ->
process_connect(St = #st{target = TargetCluster, actor = Actor, incarnation = Incr}) ->
case start_link_client(TargetCluster) of
{ok, ClientPid} ->
ok = start_syncer(TargetCluster),
ok = announce_client(TargetCluster, ClientPid),
%% TODO: error handling, handshake
{ok, _} = emqx_cluster_link_mqtt:publish_actor_init_sync(ClientPid, Actor, Incr),
process_bootstrap(St#st{client = ClientPid});
{error, Reason} ->
handle_connect_error(Reason, St)
end.
handle_connect_error(Reason, St) ->
handle_connect_error(_Reason, St) ->
%% TODO: logs
TRef = erlang:start_timer(?RECONNECT_TIMEOUT, self(), reconnect),
St#st{reconnect_timer = TRef}.
handle_client_down(Reason, St = #st{target = TargetCluster}) ->
handle_client_down(_Reason, St = #st{target = TargetCluster}) ->
%% TODO: logs
ok = close_syncer(TargetCluster),
process_connect(St#st{client = undefined}).

View File

@ -22,13 +22,17 @@ init(LinksConf) ->
intensity => 10,
period => 5
},
Children = [sup_spec(?COORD_SUP, ?COORD_SUP, LinksConf)],
%% Children = [sup_spec(?COORD_SUP, ?COORD_SUP, LinksConf)],
Children = [
sup_spec(Name, emqx_cluster_link_router_syncer, [Name])
|| #{upstream := Name} <- LinksConf
],
{ok, {SupFlags, Children}}.
sup_spec(Id, Mod, Conf) ->
sup_spec(Id, Mod, Args) ->
#{
id => Id,
start => {Mod, start_link, [Conf]},
start => {Mod, start_link, Args},
restart => permanent,
shutdown => infinity,
type => supervisor,