From 0219b8bd4dbcbd79ce3b5e2afce1d0f3debf0eeb Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 29 May 2024 16:07:25 +0200 Subject: [PATCH] feat(cluster-link): add simple replication actor GC process --- .../src/emqx_cluster_link_config.erl | 20 +++- .../src/emqx_cluster_link_extrouter.erl | 31 +++--- .../src/emqx_cluster_link_extrouter_gc.erl | 95 +++++++++++++++++++ .../src/emqx_cluster_link_router_syncer.erl | 4 +- .../src/emqx_cluster_link_sup.erl | 16 +++- .../emqx_cluster_link_extrouter_SUITE.erl | 12 ++- 6 files changed, 153 insertions(+), 25 deletions(-) create mode 100644 apps/emqx_cluster_link/src/emqx_cluster_link_extrouter_gc.erl diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl index c81c7e2e8..9d840256a 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl @@ -13,6 +13,8 @@ -define(MQTT_HOST_OPTS, #{default_port => 1883}). +-define(DEFAULT_ACTOR_TTL, 30_000). + -export([ %% General cluster/0, @@ -22,7 +24,11 @@ topic_filters/1, %% Connections emqtt_options/1, - mk_emqtt_options/1 + mk_emqtt_options/1, + %% Actor Lifecycle + actor_ttl/0, + actor_gc_interval/0, + actor_heartbeat_interval/0 ]). -export([ @@ -58,6 +64,18 @@ emqtt_options(LinkName) -> topic_filters(LinkName) -> maps:get(topics, ?MODULE:link(LinkName), []). +-spec actor_ttl() -> _Milliseconds :: pos_integer(). +actor_ttl() -> + ?DEFAULT_ACTOR_TTL. + +-spec actor_gc_interval() -> _Milliseconds :: pos_integer(). +actor_gc_interval() -> + actor_ttl(). + +-spec actor_heartbeat_interval() -> _Milliseconds :: pos_integer(). +actor_heartbeat_interval() -> + actor_ttl() div 3. + %% mk_emqtt_options(#{server := Server, ssl := #{enable := EnableSsl} = Ssl} = LinkConf) -> diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl index f060f4c56..e058cb816 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl @@ -67,8 +67,6 @@ -include_lib("emqx/include/emqx.hrl"). --define(DEFAULT_ACTOR_TTL_MS, 30_000). - -define(EXTROUTE_SHARD, ?MODULE). -define(EXTROUTE_TAB, emqx_external_router_route). -define(EXTROUTE_ACTOR_TAB, emqx_external_router_actor). @@ -280,16 +278,22 @@ apply_operation(Entry, MCounter, OpName, Lane) -> MCounter end. --spec actor_gc(env()) -> ok. +-spec actor_gc(env()) -> _NumCleaned :: non_neg_integer(). actor_gc(#{timestamp := Now}) -> MS = [{#actor{until = '$1', _ = '_'}, [{'<', '$1', Now}], ['$_']}], - case mnesia:dirty_select(?EXTROUTE_ACTOR_TAB, MS) of - [Rec | _Rest] -> - %% NOTE: One at a time. - clean_incarnation(Rec); - [] -> - ok - end. + Dead = mnesia:dirty_select(?EXTROUTE_ACTOR_TAB, MS), + try_clean_incarnation(Dead). + +try_clean_incarnation([Rec | Rest]) -> + %% NOTE: One at a time. + case clean_incarnation(Rec) of + ok -> + 1; + stale -> + try_clean_incarnation(Rest) + end; +try_clean_incarnation([]) -> + 0. mnesia_assign_lane(Cluster) -> Assignment = lists:foldl( @@ -323,7 +327,7 @@ mnesia_clean_incarnation(#actor{id = Actor, incarnation = Incarnation, lane = La _ = clean_lane(Lane), mnesia:delete(?EXTROUTE_ACTOR_TAB, Actor, write); _Renewed -> - ok + stale end. clean_lane(Lane) -> @@ -368,7 +372,4 @@ first_zero_bit(N, I) -> %% bump_actor_ttl(TS) -> - TS + get_actor_ttl(). - -get_actor_ttl() -> - ?DEFAULT_ACTOR_TTL_MS. + TS + emqx_cluster_link_config:actor_ttl(). diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter_gc.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter_gc.erl new file mode 100644 index 000000000..e185c5137 --- /dev/null +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter_gc.erl @@ -0,0 +1,95 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_cluster_link_extrouter_gc). + +-include_lib("emqx/include/logger.hrl"). + +-export([start_link/0]). + +-export([run/0]). + +-behaviour(gen_server). +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2 +]). + +-define(SERVER, ?MODULE). + +-define(REPEAT_GC_INTERVAL, 5_000). + +%% + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +run() -> + gen_server:call(?SERVER, run). + +%% + +-record(st, { + gc_timer :: reference() +}). + +init(_) -> + {ok, schedule_gc(#st{})}. + +handle_call(run, _From, St) -> + Result = run_gc(), + Timeout = choose_timeout(Result), + {reply, Result, reschedule_gc(Timeout, St)}; +handle_call(_Call, _From, St) -> + {reply, ignored, St}. + +handle_cast(Cast, State) -> + ?SLOG(warning, #{msg => "unexpected_cast", cast => Cast}), + {noreply, State}. + +handle_info({timeout, TRef, _GC}, St = #st{gc_timer = TRef}) -> + Result = run_gc_exclusive(), + Timeout = choose_timeout(Result), + {noreply, schedule_gc(Timeout, St#st{gc_timer = undefined})}; +handle_info(Info, St) -> + ?SLOG(warning, #{msg => "unexpected_info", info => Info}), + {noreply, St}. + +%% + +run_gc_exclusive() -> + case is_responsible() of + true -> run_gc(); + false -> 0 + end. + +is_responsible() -> + Nodes = lists:sort(mria_membership:running_core_nodelist()), + Nodes =/= [] andalso hd(Nodes) == node(). + +-spec run_gc() -> _NumCleaned :: non_neg_integer(). +run_gc() -> + Env = #{timestamp => erlang:system_time(millisecond)}, + emqx_cluster_link_extrouter:actor_gc(Env). + +choose_timeout(_NumCleaned = 0) -> + emqx_cluster_link_config:actor_gc_interval(); +choose_timeout(_NumCleaned) -> + %% NOTE: There could likely be more outdated actors. + ?REPEAT_GC_INTERVAL. + +schedule_gc(St) -> + schedule_gc(emqx_cluster_link_config:actor_gc_interval(), St). + +schedule_gc(Timeout, St = #st{gc_timer = undefined}) -> + TRef = erlang:start_timer(Timeout, self(), gc), + St#st{gc_timer = TRef}. + +reschedule_gc(Timeout, St = #st{gc_timer = undefined}) -> + schedule_gc(Timeout, St); +reschedule_gc(Timeout, St = #st{gc_timer = TRef}) -> + ok = emqx_utils:cancel_timer(TRef), + schedule_gc(Timeout, St#st{gc_timer = undefined}). diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl index 2e6f63834..ffce01812 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl @@ -54,7 +54,6 @@ -define(RECONNECT_TIMEOUT, 5_000). -define(ACTOR_REINIT_TIMEOUT, 7000). --define(HEARTBEAT_INTERVAL, 10_000). -define(CLIENT_SUFFIX, ":routesync:"). -define(PS_CLIENT_SUFFIX, ":routesync-ps:"). @@ -475,7 +474,8 @@ process_heartbeat(St = #st{client = ClientPid, actor = Actor, incarnation = Inca schedule_heartbeat(St). schedule_heartbeat(St = #st{heartbeat_timer = undefined}) -> - TRef = erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat), + Timeout = emqx_cluster_link_config:actor_heartbeat_interval(), + TRef = erlang:start_timer(Timeout, self(), heartbeat), St#st{heartbeat_timer = TRef}. %% Bootstrapping. diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl index beb641a92..872054fa0 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl @@ -10,7 +10,6 @@ -export([init/1]). --define(COORD_SUP, emqx_cluster_link_coord_sup). -define(SERVER, ?MODULE). start_link(LinksConf) -> @@ -22,12 +21,21 @@ init(LinksConf) -> intensity => 10, period => 5 }, - %% Children = [sup_spec(?COORD_SUP, ?COORD_SUP, LinksConf)], - Children = [ + ExtrouterGC = extrouter_gc_spec(), + RouteActors = [ sup_spec(Name, emqx_cluster_link_router_syncer, [Name]) || #{upstream := Name} <- LinksConf ], - {ok, {SupFlags, Children}}. + {ok, {SupFlags, [ExtrouterGC | RouteActors]}}. + +extrouter_gc_spec() -> + %% NOTE: This one is currently global, not per-link. + #{ + id => {extrouter, gc}, + start => {emqx_cluster_link_extrouter_gc, start_link, []}, + restart => permanent, + type => worker + }. sup_spec(Id, Mod, Args) -> #{ diff --git a/apps/emqx_cluster_link/test/emqx_cluster_link_extrouter_SUITE.erl b/apps/emqx_cluster_link/test/emqx_cluster_link_extrouter_SUITE.erl index 9f80109fd..bb281ce4c 100644 --- a/apps/emqx_cluster_link/test/emqx_cluster_link_extrouter_SUITE.erl +++ b/apps/emqx_cluster_link/test/emqx_cluster_link_extrouter_SUITE.erl @@ -124,7 +124,10 @@ t_actor_gc(_Config) -> topics_sorted() ), _AS13 = apply_operation(heartbeat, AS12, 50_000), - ok = emqx_cluster_link_extrouter:actor_gc(env(60_000)), + ?assertEqual( + 1, + emqx_cluster_link_extrouter:actor_gc(env(60_000)) + ), ?assertEqual( [<<"topic/#">>, <<"topic/42/+">>], topics_sorted() @@ -133,7 +136,10 @@ t_actor_gc(_Config) -> _IncarnationMismatch, apply_operation({add, {<<"toolate/#">>, id}}, AS21) ), - ok = emqx_cluster_link_extrouter:actor_gc(env(120_000)), + ?assertEqual( + 1, + emqx_cluster_link_extrouter:actor_gc(env(120_000)) + ), ?assertEqual( [], topics_sorted() @@ -273,7 +279,7 @@ run_actor({Actor, Seq}) -> ({TS, heartbeat}, AS) -> apply_operation(heartbeat, AS, TS); ({TS, gc}, AS) -> - ok = emqx_cluster_link_extrouter:actor_gc(env(TS)), + _NC = emqx_cluster_link_extrouter:actor_gc(env(TS)), AS; ({_TS, {sleep, MS}}, AS) -> ok = timer:sleep(MS),