diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl new file mode 100644 index 000000000..ec25461a7 --- /dev/null +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl @@ -0,0 +1,318 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_cluster_link_extrouter). + +-export([create_tables/0]). + +%% Router API +-export([ + match_routes/1, + lookup_routes/1, + topics/0 +]). + +%% Actor API +-export([ + actor_init/3, + actor_apply_operation/3, + actor_gc/1 +]). + +%% Strictly monotonically increasing integer. +-type smint() :: integer(). + +%% Actor. +%% Identifies an independent route replication actor on the remote broker. +%% Usually something like `node()` or `{node(), _Shard}`. +-type actor() :: term(). + +%% Identifies incarnation of the actor. +%% In the event of actor restart, it's the actor's responsibility to keep track of +%% monotonicity of its incarnation number. Each time actor's incarnation increases, +%% we assume that all the state of the previous incarnations is lost. +-type incarnation() :: smint(). + +%% Operation. +%% RouteID should come in handy when two or more different routes on the actor side +%% are "intersected" to the same topic filter that needs to be replicated here. +-type op() :: {add | del, _TopicFilter :: binary(), _RouteID} | heartbeat. + +%% Basically a bit offset. +%% Each actor + incarnation pair occupies a separate lane in the multi-counter. +%% Example: +%% Actors | n1@ds n2@ds n3@ds +%% Lanes | 0 1 2 +%% Op1 | n3@ds add client/42/# → MCounter += 1 bsl 2 = 4 +%% Op2 | n2@ds add client/42/# → MCounter += 1 bsl 1 = 6 +%% Op3 | n3@ds del client/42/# → MCounter -= 1 bsl 2 = 2 +%% Op4 | n2@ds del client/42/# → MCounter -= 1 bsl 1 = 0 → route deleted +-type lane() :: non_neg_integer(). + +-define(DEFAULT_ACTOR_TTL_MS, 30_000). + +-define(EXTROUTE_SHARD, ?MODULE). +-define(EXTROUTE_TAB, emqx_external_router_route). +-define(EXTROUTE_ACTOR_TAB, emqx_external_router_actor). + +-record(extroute, { + entry :: emqx_topic_index:key(_RouteID), + mcounter = 0 :: non_neg_integer() +}). + +-record(actor, { + id :: actor(), + incarnation :: incarnation(), + lane :: lane(), + until :: _Timestamp +}). + +%% + +create_tables() -> + %% TODO: Table per link viable? + mria_config:set_dirty_shard(?EXTROUTE_SHARD, true), + ok = mria:create_table(?EXTROUTE_ACTOR_TAB, [ + {type, set}, + {rlog_shard, ?EXTROUTE_SHARD}, + {storage, ram_copies}, + {record_name, actor}, + {attributes, record_info(fields, actor)} + ]), + ok = mria:create_table(?EXTROUTE_TAB, [ + {type, ordered_set}, + {rlog_shard, ?EXTROUTE_SHARD}, + {storage, ram_copies}, + {record_name, extroute}, + {attributes, record_info(fields, extroute)}, + {storage_properties, [ + {ets, [ + {read_concurrency, true}, + {write_concurrency, true}, + {decentralized_counters, true} + ]} + ]} + ]), + [?EXTROUTE_TAB]. + +%% + +match_routes(Topic) -> + Matches = emqx_topic_index:matches(Topic, ?EXTROUTE_TAB, [unique]), + [match_to_route(M) || M <- Matches]. + +lookup_routes(Topic) -> + Pat = #extroute{entry = emqx_topic_index:make_key(Topic, '$1'), _ = '_'}, + [match_to_route(R#extroute.entry) || Records <- ets:match(?EXTROUTE_TAB, Pat), R <- Records]. + +topics() -> + Pat = #extroute{entry = '$1', _ = '_'}, + [emqx_topic_index:get_topic(K) || [K] <- ets:match(?EXTROUTE_TAB, Pat)]. + +match_to_route(M) -> + emqx_topic_index:get_topic(M). + +%% + +-record(state, { + actor :: actor(), + incarnation :: incarnation(), + lane :: lane() | undefined +}). + +-type state() :: #state{}. + +-type env() :: #{timestamp := _Milliseconds}. + +-spec actor_init(actor(), incarnation(), env()) -> {ok, state()}. +actor_init(Actor, Incarnation, Env = #{timestamp := Now}) -> + %% FIXME: Sane transactions. + case transaction(fun mnesia_actor_init/3, [Actor, Incarnation, Now]) of + {ok, State} -> + {ok, State}; + {reincarnate, Rec} -> + %% TODO: Do this asynchronously. + ok = clean_incarnation(Rec), + actor_init(Actor, Incarnation, Env) + end. + +mnesia_actor_init(Actor, Incarnation, TS) -> + %% NOTE + %% We perform this heavy-weight transaction only in the case of a new route + %% replication connection. The implicit assumption is that each replication + %% channel is uniquely identified by the ClientID (reflecting the Actor), and + %% the broker will take care of ensuring that there's only one connection per + %% ClientID. There's always a chance of having stray process severely lagging + %% that applies some update out of the blue, but it seems impossible to prevent + %% it completely w/o transactions. + State = #state{actor = Actor, incarnation = Incarnation}, + case mnesia:read(?EXTROUTE_ACTOR_TAB, Actor, write) of + [#actor{incarnation = Incarnation, lane = Lane} = Rec] -> + ok = mnesia:write(?EXTROUTE_ACTOR_TAB, Rec#actor{until = bump_actor_ttl(TS)}, write), + {ok, State#state{lane = Lane}}; + [] -> + Lane = mnesia_assign_lane(), + Rec = #actor{ + id = Actor, + incarnation = Incarnation, + lane = Lane, + until = bump_actor_ttl(TS) + }, + ok = mnesia:write(?EXTROUTE_ACTOR_TAB, Rec, write), + {ok, State#state{lane = Lane}}; + [#actor{incarnation = Outdated} = Rec] when Incarnation > Outdated -> + {reincarnate, Rec}; + [#actor{incarnation = Newer}] -> + mnesia:abort({outdated_incarnation_actor, Actor, Incarnation, Newer}) + end. + +-spec actor_apply_operation(op(), state(), env()) -> state(). +actor_apply_operation( + {OpName, TopicFilter, ID}, + State = #state{actor = Actor, incarnation = Incarnation, lane = Lane}, + _Env +) -> + _ = assert_current_incarnation(Actor, Incarnation), + _ = apply_operation(emqx_topic_index:make_key(TopicFilter, ID), OpName, Lane), + State; +actor_apply_operation( + heartbeat, + State = #state{actor = Actor, incarnation = Incarnation}, + _Env = #{timestamp := Now} +) -> + ok = transaction(fun mnesia_actor_heartbeat/3, [Actor, Incarnation, Now]), + State. + +apply_operation(Entry, OpName, Lane) -> + %% NOTE + %% This is safe sequence of operations only on core nodes. On replicants, + %% `mria:dirty_update_counter/3` will be replicated asynchronously, which + %% means this read can be stale. + % MCounter = ets:lookup_element(Tab, Entry, 2, 0), + case mnesia:dirty_read(?EXTROUTE_TAB, Entry) of + [#extroute{mcounter = MCounter}] -> + apply_operation(Entry, MCounter, OpName, Lane); + [] -> + apply_operation(Entry, 0, OpName, Lane) + end. + +apply_operation(Entry, MCounter, OpName, Lane) -> + %% NOTE + %% We are relying on the fact that changes to each individual lane of this + %% multi-counter are synchronized. Without this, such counter updates would + %% be unsafe. Instead, we would have to use another, more complex approach, + %% that runs `ets:lookup/2` + `ets:select_replace/2` in a loop until the + %% counter is updated accordingly. + Marker = 1 bsl Lane, + case MCounter band Marker of + 0 when OpName =:= add -> + mria:dirty_update_counter(?EXTROUTE_TAB, Entry, Marker); + Marker when OpName =:= add -> + %% Already added. + MCounter; + Marker when OpName =:= del -> + case mria:dirty_update_counter(?EXTROUTE_TAB, Entry, -Marker) of + 0 -> + Record = #extroute{entry = Entry, mcounter = 0}, + ok = mria:dirty_delete_object(?EXTROUTE_TAB, Record), + 0; + C -> + C + end; + 0 when OpName =:= del -> + %% Already deleted. + MCounter + end. + +-spec actor_gc(env()) -> ok. +actor_gc(#{timestamp := Now}) -> + MS = [{#actor{until = '$1', _ = '_'}, [{'<', '$1', Now}], ['$_']}], + case mnesia:dirty_select(?EXTROUTE_ACTOR_TAB, MS) of + [Rec | _Rest] -> + %% NOTE: One at a time. + clean_incarnation(Rec); + [] -> + ok + end. + +mnesia_assign_lane() -> + Assignment = mnesia:foldl( + fun(#actor{lane = Lane}, Acc) -> + Acc bor (1 bsl Lane) + end, + 0, + ?EXTROUTE_ACTOR_TAB, + write + ), + Lane = first_zero_bit(Assignment), + Lane. + +mnesia_actor_heartbeat(Actor, Incarnation, TS) -> + case mnesia:read(?EXTROUTE_ACTOR_TAB, Actor, write) of + [#actor{incarnation = Incarnation} = Rec] -> + ok = mnesia:write(?EXTROUTE_ACTOR_TAB, Rec#actor{until = bump_actor_ttl(TS)}, write); + [#actor{incarnation = Outdated}] -> + mnesia:abort({outdated_incarnation_actor, Actor, Incarnation, Outdated}); + [] -> + mnesia:abort({nonexistent_actor, Actor}) + end. + +clean_incarnation(Rec) -> + transaction(fun mnesia_clean_incarnation/1, [Rec]). + +mnesia_clean_incarnation(#actor{id = Actor, incarnation = Incarnation, lane = Lane}) -> + case mnesia:read(?EXTROUTE_ACTOR_TAB, Actor, write) of + [#actor{incarnation = Incarnation}] -> + _ = clean_lane(Lane), + mnesia:delete(?EXTROUTE_ACTOR_TAB, Actor, write); + _Renewed -> + ok + end. + +clean_lane(Lane) -> + ets:foldl( + fun(#extroute{entry = Entry, mcounter = MCounter}, _) -> + apply_operation(Entry, MCounter, del, Lane) + end, + 0, + ?EXTROUTE_TAB + ). + +assert_current_incarnation(Actor, Incarnation) -> + %% NOTE + %% Ugly, but should not really happen anyway. This is a safety net for the case + %% when this process tries to apply some outdated operation for whatever reason + %% (e.g. heavy CPU starvation). Still, w/o transactions, it's just a best-effort + %% attempt. + [#actor{incarnation = Incarnation}] = mnesia:dirty_read(?EXTROUTE_ACTOR_TAB, Actor), + ok. + +%% + +transaction(Fun, Args) -> + case mria:transaction(?EXTROUTE_SHARD, Fun, Args) of + {atomic, Result} -> + Result; + {aborted, Reason} -> + error(Reason) + end. + +%% + +first_zero_bit(N) -> + first_zero_bit(N, 0). + +first_zero_bit(N, I) -> + case N band 1 of + 0 -> I; + _ -> first_zero_bit(N bsr 1, I + 1) + end. + +%% + +bump_actor_ttl(TS) -> + TS + get_actor_ttl(). + +get_actor_ttl() -> + ?DEFAULT_ACTOR_TTL_MS. diff --git a/apps/emqx_cluster_link/test/emqx_cluster_link_extrouter_SUITE.erl b/apps/emqx_cluster_link/test/emqx_cluster_link_extrouter_SUITE.erl new file mode 100644 index 000000000..e83698895 --- /dev/null +++ b/apps/emqx_cluster_link/test/emqx_cluster_link_extrouter_SUITE.erl @@ -0,0 +1,286 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_cluster_link_extrouter_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("emqx/include/asserts.hrl"). + +-compile(export_all). +-compile(nowarn_export_all). + +%% + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + Apps = emqx_cth_suite:start([], #{work_dir => emqx_cth_suite:work_dir(Config)}), + ok = init_db(), + [{apps, Apps} | Config]. + +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(apps, Config)). + +init_per_testcase(TC, Config) -> + emqx_common_test_helpers:init_per_testcase(?MODULE, TC, Config). + +end_per_testcase(TC, Config) -> + emqx_common_test_helpers:end_per_testcase(?MODULE, TC, Config). + +init_db() -> + mria:wait_for_tables(emqx_cluster_link_extrouter:create_tables()). + +%% + +t_consistent_routing_view(_Config) -> + Actor1 = {?FUNCTION_NAME, 1}, + Actor2 = {?FUNCTION_NAME, 2}, + Actor3 = {?FUNCTION_NAME, 3}, + {ok, AS10} = emqx_cluster_link_extrouter:actor_init(Actor1, 1, env()), + {ok, AS20} = emqx_cluster_link_extrouter:actor_init(Actor2, 1, env()), + {ok, AS30} = emqx_cluster_link_extrouter:actor_init(Actor3, 1, env()), + %% Add few routes originating from different actors. + %% Also test that route operations are idempotent. + AS11 = apply_operation({add, <<"t/client/#">>, id}, AS10), + _AS11 = apply_operation({add, <<"t/client/#">>, id}, AS10), + AS21 = apply_operation({add, <<"t/client/#">>, id}, AS20), + AS31 = apply_operation({add, <<"t/client/+/+">>, id1}, AS30), + AS32 = apply_operation({add, <<"t/client/+/+">>, id2}, AS31), + _AS22 = apply_operation({del, <<"t/client/#">>, id}, AS21), + AS12 = apply_operation({add, <<"t/client/+/+">>, id1}, AS11), + AS33 = apply_operation({del, <<"t/client/+/+">>, id1}, AS32), + _AS34 = apply_operation({del, <<"t/client/+/+">>, id2}, AS33), + ?assertEqual( + [<<"t/client/#">>, <<"t/client/+/+">>], + topics_sorted() + ), + ?assertEqual( + [<<"t/client/#">>], + lists:sort(emqx_cluster_link_extrouter:match_routes(<<"t/client/42">>)) + ), + %% Remove all routes from the actors. + AS13 = apply_operation({del, <<"t/client/#">>, id}, AS12), + AS14 = apply_operation({del, <<"t/client/+/+">>, id1}, AS13), + AS14 = apply_operation({del, <<"t/client/+/+">>, id1}, AS13), + ?assertEqual( + [], + topics_sorted() + ). + +t_actor_reincarnation(_Config) -> + Actor1 = {?FUNCTION_NAME, 1}, + Actor2 = {?FUNCTION_NAME, 2}, + {ok, AS10} = emqx_cluster_link_extrouter:actor_init(Actor1, 1, env()), + {ok, AS20} = emqx_cluster_link_extrouter:actor_init(Actor2, 1, env()), + AS11 = apply_operation({add, <<"topic/#">>, id}, AS10), + AS12 = apply_operation({add, <<"topic/42/+">>, id}, AS11), + AS21 = apply_operation({add, <<"topic/#">>, id}, AS20), + ?assertEqual( + [<<"topic/#">>, <<"topic/42/+">>], + topics_sorted() + ), + {ok, _AS3} = emqx_cluster_link_extrouter:actor_init(Actor1, 2, env()), + ?assertError( + _IncarnationMismatch, + apply_operation({add, <<"toolate/#">>, id}, AS12) + ), + ?assertEqual( + [<<"topic/#">>], + topics_sorted() + ), + {ok, _AS4} = emqx_cluster_link_extrouter:actor_init(Actor2, 2, env()), + ?assertError( + _IncarnationMismatch, + apply_operation({add, <<"toolate/#">>, id}, AS21) + ), + ?assertEqual( + [], + topics_sorted() + ). + +t_actor_gc(_Config) -> + Actor1 = {?FUNCTION_NAME, 1}, + Actor2 = {?FUNCTION_NAME, 2}, + {ok, AS10} = emqx_cluster_link_extrouter:actor_init(Actor1, 1, env()), + {ok, AS20} = emqx_cluster_link_extrouter:actor_init(Actor2, 1, env()), + AS11 = apply_operation({add, <<"topic/#">>, id}, AS10), + AS12 = apply_operation({add, <<"topic/42/+">>, id}, AS11), + AS21 = apply_operation({add, <<"global/#">>, id}, AS20), + ?assertEqual( + [<<"global/#">>, <<"topic/#">>, <<"topic/42/+">>], + topics_sorted() + ), + _AS13 = apply_operation(heartbeat, AS12, 50_000), + ok = emqx_cluster_link_extrouter:actor_gc(env(60_000)), + ?assertEqual( + [<<"topic/#">>, <<"topic/42/+">>], + topics_sorted() + ), + ?assertError( + _IncarnationMismatch, + apply_operation({add, <<"toolate/#">>, id}, AS21) + ), + ok = emqx_cluster_link_extrouter:actor_gc(env(120_000)), + ?assertEqual( + [], + topics_sorted() + ). + +t_consistent_routing_view_concurrent_updates(_Config) -> + A1Seq = repeat(10, [ + reincarnate, + {add, <<"t/client/#">>, id}, + {add, <<"t/client/+/+">>, id1}, + {add, <<"t/client/+/+">>, id1}, + {del, <<"t/client/#">>, id} + ]), + A2Seq = repeat(10, [ + {add, <<"global/#">>, id}, + {add, <<"t/client/+/+">>, id1}, + {add, <<"t/client/+/+">>, id2}, + {del, <<"t/client/+/+">>, id1}, + heartbeat + ]), + A3Seq = repeat(10, [ + {add, <<"global/#">>, id}, + {del, <<"global/#">>, id}, + {add, <<"t/client/+/+">>, id1}, + {del, <<"t/client/+/+">>, id1}, + {add, <<"t/client/+/+">>, id2}, + {del, <<"t/client/+/+">>, id2}, + reincarnate + ]), + A4Seq = repeat(10, [ + gc, + {sleep, 1} + ]), + _ = emqx_utils:pmap( + fun run_actor/1, + [ + {{?FUNCTION_NAME, 1}, A1Seq}, + {{?FUNCTION_NAME, 2}, A2Seq}, + {{?FUNCTION_NAME, 3}, A3Seq}, + {{?FUNCTION_NAME, gc}, A4Seq} + ], + infinity + ), + ?assertEqual( + [<<"global/#">>, <<"t/client/+/+">>, <<"t/client/+/+">>], + topics_sorted() + ). + +t_consistent_routing_view_concurrent_cluster_updates('init', Config) -> + Specs = [ + {emqx_external_router1, #{role => core}}, + {emqx_external_router2, #{role => core}}, + {emqx_external_router3, #{role => core}} + ], + Cluster = emqx_cth_cluster:start( + Specs, + #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} + ), + ok = lists:foreach( + fun(Node) -> + ok = erpc:call(Node, ?MODULE, init_db, []) + end, + Cluster + ), + [{cluster, Cluster} | Config]; +t_consistent_routing_view_concurrent_cluster_updates('end', Config) -> + ok = emqx_cth_cluster:stop(?config(cluster, Config)). + +t_consistent_routing_view_concurrent_cluster_updates(Config) -> + [N1, N2, N3] = ?config(cluster, Config), + A1Seq = repeat(10, [ + reincarnate, + {add, <<"t/client/#">>, id}, + {add, <<"t/client/+/+">>, id1}, + {add, <<"t/client/+/+">>, id1}, + {del, <<"t/client/#">>, id} + ]), + A2Seq = repeat(10, [ + {add, <<"global/#">>, id}, + {add, <<"t/client/+/+">>, id1}, + {add, <<"t/client/+/+">>, id2}, + {del, <<"t/client/+/+">>, id1}, + heartbeat + ]), + A3Seq = repeat(10, [ + {add, <<"global/#">>, id}, + {del, <<"global/#">>, id}, + {add, <<"t/client/+/+">>, id1}, + {del, <<"t/client/+/+">>, id1}, + {add, <<"t/client/+/+">>, id2}, + {del, <<"t/client/+/+">>, id2}, + reincarnate + ]), + A4Seq = repeat(10, [ + gc, + {sleep, 1} + ]), + Runners = lists:map( + fun run_remote_actor/1, + [ + {N1, {{?FUNCTION_NAME, 1}, A1Seq}}, + {N2, {{?FUNCTION_NAME, 2}, A2Seq}}, + {N3, {{?FUNCTION_NAME, 3}, A3Seq}}, + {N3, {{?FUNCTION_NAME, gc}, A4Seq}} + ] + ), + [?assertReceive({'DOWN', MRef, _, Pid, normal}) || {Pid, MRef} <- Runners], + ?assertEqual( + [<<"global/#">>, <<"t/client/+/+">>, <<"t/client/+/+">>], + erpc:call(N1, ?MODULE, topics_sorted, []) + ). + +run_remote_actor({Node, Run}) -> + erlang:spawn_monitor(Node, ?MODULE, run_actor, [Run]). + +run_actor({Actor, Seq}) -> + {ok, AS0} = emqx_cluster_link_extrouter:actor_init(Actor, 0, env(0)), + lists:foldl( + fun + ({TS, {add, _, _} = Op}, AS) -> + apply_operation(Op, AS, TS); + ({TS, {del, _, _} = Op}, AS) -> + apply_operation(Op, AS, TS); + ({TS, heartbeat}, AS) -> + apply_operation(heartbeat, AS, TS); + ({TS, gc}, AS) -> + ok = emqx_cluster_link_extrouter:actor_gc(env(TS)), + AS; + ({_TS, {sleep, MS}}, AS) -> + ok = timer:sleep(MS), + AS; + ({TS, reincarnate}, _AS) -> + {ok, AS} = emqx_cluster_link_extrouter:actor_init(Actor, TS, env(TS)), + AS + end, + AS0, + lists:enumerate(Seq) + ). + +%% + +apply_operation(Op, AS) -> + apply_operation(Op, AS, _TS = 42). + +apply_operation(Op, AS, TS) -> + emqx_cluster_link_extrouter:actor_apply_operation(Op, AS, env(TS)). + +env() -> + env(42). + +env(TS) -> + #{timestamp => TS}. + +topics_sorted() -> + lists:sort(emqx_cluster_link_extrouter:topics()). + +%% + +repeat(N, L) -> + lists:flatten(lists:duplicate(N, L)).