From 4097585f5dae66b35ae290f4152c758a7c381ce4 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 6 May 2024 18:33:45 +0200 Subject: [PATCH] fix(clusterlink): ensure extrouter works on replicants This is sort of a quick fix to make things safe, but it will likely be a subject to the same drawbacks as the regular router in high-latency deployments: reduced throughput. --- .../src/emqx_cluster_link_extrouter.erl | 33 ++++++++++++++--- .../emqx_cluster_link_extrouter_SUITE.erl | 37 ++++++++++++++----- 2 files changed, 55 insertions(+), 15 deletions(-) diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl index ec25461a7..76999f4cf 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl @@ -20,6 +20,14 @@ actor_gc/1 ]). +%% Internal API +-export([ + mnesia_actor_init/3, + mnesia_actor_heartbeat/3, + mnesia_clean_incarnation/1, + apply_actor_operation/5 +]). + %% Strictly monotonically increasing integer. -type smint() :: integer(). @@ -127,8 +135,8 @@ match_to_route(M) -> -spec actor_init(actor(), incarnation(), env()) -> {ok, state()}. actor_init(Actor, Incarnation, Env = #{timestamp := Now}) -> - %% FIXME: Sane transactions. - case transaction(fun mnesia_actor_init/3, [Actor, Incarnation, Now]) of + %% TODO: Rolling upgrade safety? + case transaction(fun ?MODULE:mnesia_actor_init/3, [Actor, Incarnation, Now]) of {ok, State} -> {ok, State}; {reincarnate, Rec} -> @@ -173,17 +181,30 @@ actor_apply_operation( State = #state{actor = Actor, incarnation = Incarnation, lane = Lane}, _Env ) -> - _ = assert_current_incarnation(Actor, Incarnation), - _ = apply_operation(emqx_topic_index:make_key(TopicFilter, ID), OpName, Lane), + Entry = emqx_topic_index:make_key(TopicFilter, ID), + case mria_config:whoami() of + Role when Role /= replicant -> + apply_actor_operation(Actor, Incarnation, Entry, OpName, Lane); + replicant -> + mria:async_dirty( + ?EXTROUTE_SHARD, + fun ?MODULE:apply_actor_operation/5, + [Actor, Incarnation, Entry, OpName, Lane] + ) + end, State; actor_apply_operation( heartbeat, State = #state{actor = Actor, incarnation = Incarnation}, _Env = #{timestamp := Now} ) -> - ok = transaction(fun mnesia_actor_heartbeat/3, [Actor, Incarnation, Now]), + ok = transaction(fun ?MODULE:mnesia_actor_heartbeat/3, [Actor, Incarnation, Now]), State. +apply_actor_operation(Actor, Incarnation, Entry, OpName, Lane) -> + _ = assert_current_incarnation(Actor, Incarnation), + apply_operation(Entry, OpName, Lane). + apply_operation(Entry, OpName, Lane) -> %% NOTE %% This is safe sequence of operations only on core nodes. On replicants, @@ -259,7 +280,7 @@ mnesia_actor_heartbeat(Actor, Incarnation, TS) -> end. clean_incarnation(Rec) -> - transaction(fun mnesia_clean_incarnation/1, [Rec]). + transaction(fun ?MODULE:mnesia_clean_incarnation/1, [Rec]). mnesia_clean_incarnation(#actor{id = Actor, incarnation = Incarnation, lane = Lane}) -> case mnesia:read(?EXTROUTE_ACTOR_TAB, Actor, write) of diff --git a/apps/emqx_cluster_link/test/emqx_cluster_link_extrouter_SUITE.erl b/apps/emqx_cluster_link/test/emqx_cluster_link_extrouter_SUITE.erl index e83698895..fffca47c7 100644 --- a/apps/emqx_cluster_link/test/emqx_cluster_link_extrouter_SUITE.erl +++ b/apps/emqx_cluster_link/test/emqx_cluster_link_extrouter_SUITE.erl @@ -33,6 +33,12 @@ end_per_testcase(TC, Config) -> init_db() -> mria:wait_for_tables(emqx_cluster_link_extrouter:create_tables()). +init_db_nodes(Nodes) -> + ok = lists:foreach( + fun(Node) -> ok = erpc:call(Node, ?MODULE, init_db, []) end, + Nodes + ). + %% t_consistent_routing_view(_Config) -> @@ -174,20 +180,15 @@ t_consistent_routing_view_concurrent_updates(_Config) -> t_consistent_routing_view_concurrent_cluster_updates('init', Config) -> Specs = [ - {emqx_external_router1, #{role => core}}, - {emqx_external_router2, #{role => core}}, - {emqx_external_router3, #{role => core}} + {emqx_cluster_link_extrouter1, #{role => core}}, + {emqx_cluster_link_extrouter2, #{role => core}}, + {emqx_cluster_link_extrouter3, #{role => core}} ], Cluster = emqx_cth_cluster:start( Specs, #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} ), - ok = lists:foreach( - fun(Node) -> - ok = erpc:call(Node, ?MODULE, init_db, []) - end, - Cluster - ), + ok = init_db_nodes(Cluster), [{cluster, Cluster} | Config]; t_consistent_routing_view_concurrent_cluster_updates('end', Config) -> ok = emqx_cth_cluster:stop(?config(cluster, Config)). @@ -236,6 +237,24 @@ t_consistent_routing_view_concurrent_cluster_updates(Config) -> erpc:call(N1, ?MODULE, topics_sorted, []) ). +t_consistent_routing_view_concurrent_cluster_replicant_updates('init', Config) -> + Specs = [ + {emqx_cluster_link_extrouter_repl1, #{role => core}}, + {emqx_cluster_link_extrouter_repl2, #{role => core}}, + {emqx_cluster_link_extrouter_repl3, #{role => replicant}} + ], + Cluster = emqx_cth_cluster:start( + Specs, + #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} + ), + ok = init_db_nodes(Cluster), + [{cluster, Cluster} | Config]; +t_consistent_routing_view_concurrent_cluster_replicant_updates('end', Config) -> + ok = emqx_cth_cluster:stop(?config(cluster, Config)). + +t_consistent_routing_view_concurrent_cluster_replicant_updates(Config) -> + t_consistent_routing_view_concurrent_cluster_updates(Config). + run_remote_actor({Node, Run}) -> erlang:spawn_monitor(Node, ?MODULE, run_actor, [Run]).