diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl index 6511cf395..21c435a0f 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl @@ -281,7 +281,7 @@ aggregate_metrics(NodeMetrics) -> format_metrics(Node, RouterMetrics, ResourceMetrics) -> Get = fun(Path, Map) -> emqx_utils_maps:deep_get(Path, Map, 0) end, - Routes = Get([counters, ?route_metric], RouterMetrics), + Routes = Get([gauges, ?route_metric], RouterMetrics), #{ node => Node, metrics => #{ diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_bookkeeper.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_bookkeeper.erl new file mode 100644 index 000000000..992fc7bf1 --- /dev/null +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_bookkeeper.erl @@ -0,0 +1,84 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_cluster_link_bookkeeper). + +%% API +-export([ + start_link/0 +]). + +%% `gen_server' API +-export([ + init/1, + handle_continue/2, + handle_call/3, + handle_cast/2, + handle_info/2 +]). + +%%------------------------------------------------------------------------------ +%% Type declarations +%%------------------------------------------------------------------------------ + +%% call/cast/info events +-record(tally_routes, {}). + +%%------------------------------------------------------------------------------ +%% API +%%------------------------------------------------------------------------------ + +-spec start_link() -> gen_server:start_ret(). +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, _InitOpts = #{}, _Opts = []). + +%%------------------------------------------------------------------------------ +%% `gen_server' API +%%------------------------------------------------------------------------------ + +init(_Opts) -> + State = #{}, + {ok, State, {continue, #tally_routes{}}}. + +handle_continue(#tally_routes{}, State) -> + handle_tally_routes(), + {noreply, State}. + +handle_call(_Call, _From, State) -> + {reply, {error, bad_call}, State}. + +handle_cast(_Cast, State) -> + {noreply, State}. + +handle_info(#tally_routes{}, State) -> + handle_tally_routes(), + {noreply, State}; +handle_info(_Info, State) -> + {noreply, State}. + +%%------------------------------------------------------------------------------ +%% Internal fns +%%------------------------------------------------------------------------------ + +cluster_names() -> + Links = emqx_cluster_link_config:links(), + lists:map(fun(#{name := Name}) -> Name end, Links). + +ensure_timer(Event, Timeout) -> + _ = erlang:send_after(Timeout, self(), Event), + ok. + +handle_tally_routes() -> + ClusterNames = cluster_names(), + tally_routes(ClusterNames), + ensure_timer(#tally_routes{}, emqx_cluster_link_config:tally_routes_interval()), + ok. + +tally_routes([ClusterName | ClusterNames]) -> + Tab = emqx_cluster_link_extrouter:extroute_tab(), + Pat = emqx_cluster_link_extrouter:cluster_routes_ms(ClusterName), + NumRoutes = ets:select_count(Tab, Pat), + emqx_cluster_link_metrics:routes_set(ClusterName, NumRoutes), + tally_routes(ClusterNames); +tally_routes([]) -> + ok. diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl index 2a97f2d69..fd84a5b7f 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl @@ -46,7 +46,9 @@ %% Actor Lifecycle actor_ttl/0, actor_gc_interval/0, - actor_heartbeat_interval/0 + actor_heartbeat_interval/0, + %% Metrics + tally_routes_interval/0 ]). -export([ @@ -163,6 +165,10 @@ actor_gc_interval() -> actor_heartbeat_interval() -> actor_ttl() div 3. +-spec tally_routes_interval() -> _Milliseconds :: timeout(). +tally_routes_interval() -> + emqx_config:get([cluster, tally_routes_interval]). + %% mk_emqtt_options(#{server := Server, ssl := #{enable := EnableSsl} = Ssl} = LinkConf) -> diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl index 3e2ff1804..e76b24d79 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl @@ -34,6 +34,9 @@ apply_actor_operation/5 ]). +%% Internal export for bookkeeping +-export([cluster_routes_ms/1, extroute_tab/0]). + %% Strictly monotonically increasing integer. -type smint() :: integer(). @@ -147,6 +150,18 @@ make_extroute_rec_pat(Entry) -> [{1, extroute}, {#extroute.entry, Entry}] ). +%% Internal exports for bookkeeping +cluster_routes_ms(ClusterName) -> + TopicPat = '_', + RouteIDPat = '_', + Pat = make_extroute_rec_pat( + emqx_trie_search:make_pat(TopicPat, ?ROUTE_ID(ClusterName, RouteIDPat)) + ), + [{Pat, [], [true]}]. + +extroute_tab() -> + ?EXTROUTE_TAB. + %% -record(state, { @@ -256,33 +271,31 @@ actor_apply_operation( apply_actor_operation(ActorID, Incarnation, Entry, OpName, Lane) -> _ = assert_current_incarnation(ActorID, Incarnation), - apply_operation(ActorID, Entry, OpName, Lane). + apply_operation(Entry, OpName, Lane). -apply_operation(ActorID, Entry, OpName, Lane) -> +apply_operation(Entry, OpName, Lane) -> %% NOTE %% This is safe sequence of operations only on core nodes. On replicants, %% `mria:dirty_update_counter/3` will be replicated asynchronously, which %% means this read can be stale. case mnesia:dirty_read(?EXTROUTE_TAB, Entry) of [#extroute{mcounter = MCounter}] -> - apply_operation(ActorID, Entry, MCounter, OpName, Lane); + apply_operation(Entry, MCounter, OpName, Lane); [] -> - apply_operation(ActorID, Entry, 0, OpName, Lane) + apply_operation(Entry, 0, OpName, Lane) end. -apply_operation(ActorID, Entry, MCounter, OpName, Lane) -> +apply_operation(Entry, MCounter, OpName, Lane) -> %% NOTE %% We are relying on the fact that changes to each individual lane of this %% multi-counter are synchronized. Without this, such counter updates would %% be unsafe. Instead, we would have to use another, more complex approach, %% that runs `ets:lookup/2` + `ets:select_replace/2` in a loop until the %% counter is updated accordingly. - ?ACTOR_ID(ClusterName, _Actor) = ActorID, Marker = 1 bsl Lane, case MCounter band Marker of 0 when OpName =:= add -> Res = mria:dirty_update_counter(?EXTROUTE_TAB, Entry, Marker), - _ = emqx_cluster_link_metrics:routes_inc(ClusterName, 1), ?tp("cluster_link_extrouter_route_added", #{}), Res; Marker when OpName =:= add -> @@ -293,7 +306,6 @@ apply_operation(ActorID, Entry, MCounter, OpName, Lane) -> 0 -> Record = #extroute{entry = Entry, mcounter = 0}, ok = mria:dirty_delete_object(?EXTROUTE_TAB, Record), - _ = emqx_cluster_link_metrics:routes_inc(ClusterName, -1), ?tp("cluster_link_extrouter_route_deleted", #{}), 0; C -> @@ -368,16 +380,16 @@ clean_incarnation(Rec = #actor{id = {Cluster, Actor}}) -> mnesia_clean_incarnation(#actor{id = Actor, incarnation = Incarnation, lane = Lane}) -> case mnesia:read(?EXTROUTE_ACTOR_TAB, Actor, write) of [#actor{incarnation = Incarnation}] -> - _ = clean_lane(Actor, Lane), + _ = clean_lane(Lane), mnesia:delete(?EXTROUTE_ACTOR_TAB, Actor, write); _Renewed -> stale end. -clean_lane(ActorID, Lane) -> +clean_lane(Lane) -> ets:foldl( fun(#extroute{entry = Entry, mcounter = MCounter}, _) -> - apply_operation(ActorID, Entry, MCounter, delete, Lane) + apply_operation(Entry, MCounter, delete, Lane) end, 0, ?EXTROUTE_TAB diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_metrics.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_metrics.erl index 61e5fc9ce..36c5e791d 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_metrics.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_metrics.erl @@ -11,7 +11,7 @@ drop_metrics/1, get_metrics/1, - routes_inc/2 + routes_set/2 ]). %%-------------------------------------------------------------------- @@ -50,8 +50,10 @@ maybe_create_metrics(ClusterName) -> drop_metrics(ClusterName) -> ok = emqx_metrics_worker:clear_metrics(?METRIC_NAME, ClusterName). -routes_inc(ClusterName, Val) -> - catch emqx_metrics_worker:inc(?METRIC_NAME, ClusterName, ?route_metric, Val). +routes_set(ClusterName, Val) -> + catch emqx_metrics_worker:set_gauge( + ?METRIC_NAME, ClusterName, <<"singleton">>, ?route_metric, Val + ). %%-------------------------------------------------------------------- %% Internal functions diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl index a369429d5..dd5ab66f6 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl @@ -30,7 +30,18 @@ namespace() -> "cluster". roots() -> []. injected_fields() -> - #{cluster => [{links, links_schema(#{})}]}. + #{ + cluster => [ + {links, links_schema(#{})}, + {tally_routes_interval, + hoconsc:mk( + emqx_schema:timeout_duration(), #{ + default => <<"15s">>, + importance => ?IMPORTANCE_HIDDEN + } + )} + ] + }. links_schema(Meta) -> ?HOCON(?ARRAY(?R_REF("link")), Meta#{ diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl index 81b5afb4c..42f195cf7 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl @@ -30,12 +30,13 @@ init(LinksConf) -> period => 5 }, Metrics = emqx_metrics_worker:child_spec(metrics, ?METRIC_NAME), + BookKeeper = bookkeeper_spec(), ExtrouterGC = extrouter_gc_spec(), RouteActors = [ sup_spec(Name, ?ACTOR_MODULE, [LinkConf]) || #{name := Name} = LinkConf <- LinksConf ], - {ok, {SupFlags, [Metrics, ExtrouterGC | RouteActors]}}. + {ok, {SupFlags, [Metrics, BookKeeper, ExtrouterGC | RouteActors]}}. extrouter_gc_spec() -> %% NOTE: This one is currently global, not per-link. @@ -56,6 +57,15 @@ sup_spec(Id, Mod, Args) -> modules => [Mod] }. +bookkeeper_spec() -> + #{ + id => bookkeeper, + start => {emqx_cluster_link_bookkeeper, start_link, []}, + restart => permanent, + type => worker, + shutdown => 5_000 + }. + ensure_actor(#{name := Name} = LinkConf) -> case supervisor:start_child(?SERVER, sup_spec(Name, ?ACTOR_MODULE, [LinkConf])) of {ok, Pid} -> diff --git a/apps/emqx_cluster_link/test/emqx_cluster_link_SUITE.erl b/apps/emqx_cluster_link/test/emqx_cluster_link_SUITE.erl index e023aacab..21aa1a70e 100644 --- a/apps/emqx_cluster_link/test/emqx_cluster_link_SUITE.erl +++ b/apps/emqx_cluster_link/test/emqx_cluster_link_SUITE.erl @@ -53,6 +53,7 @@ mk_source_cluster(BaseName, Config) -> SourceConf = "cluster {" "\n name = cl.source" + "\n tally_routes_interval = 300ms" "\n links = [" "\n { enable = true" "\n name = cl.target" @@ -75,6 +76,7 @@ mk_target_cluster(BaseName, Config) -> TargetConf = "cluster {" "\n name = cl.target" + "\n tally_routes_interval = 300ms" "\n links = [" "\n { enable = true" "\n name = cl.source" diff --git a/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl b/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl index e8e8f345e..486179af1 100644 --- a/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl +++ b/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl @@ -600,14 +600,22 @@ t_metrics(Config) -> #{?snk_kind := clink_route_sync_complete} ), - %% Routes = 2 in source cluster, because the target cluster has some topic filters - %% configured and subscribers to them, which were replicated to the source cluster. - ?assertMatch( - {200, #{ - <<"metrics">> := #{<<"routes">> := 2}, - <<"node_metrics">> := _ - }}, - get_metrics(source, SourceName) + %% Routes = 4 in source cluster, because the target cluster has some topic filters + %% configured and subscribers to them, which were replicated to the source cluster, + %% and we have 2 nodes with 2 routes each. + ?retry( + 300, + 10, + ?assertMatch( + {200, #{ + <<"metrics">> := #{<<"routes">> := 4}, + <<"node_metrics">> := [ + #{<<"metrics">> := #{<<"routes">> := 2}}, + #{<<"metrics">> := #{<<"routes">> := 2}} + ] + }}, + get_metrics(source, SourceName) + ) ), ?assertMatch( {200, #{ @@ -627,12 +635,19 @@ t_metrics(Config) -> #{?snk_kind := clink_route_sync_complete} ), - ?assertMatch( - {200, #{ - <<"metrics">> := #{<<"routes">> := 1}, - <<"node_metrics">> := _ - }}, - get_metrics(source, SourceName) + ?retry( + 300, + 10, + ?assertMatch( + {200, #{ + <<"metrics">> := #{<<"routes">> := 2}, + <<"node_metrics">> := [ + #{<<"metrics">> := #{<<"routes">> := 1}}, + #{<<"metrics">> := #{<<"routes">> := 1}} + ] + }}, + get_metrics(source, SourceName) + ) ), %% Disabling the link should remove the routes. @@ -658,12 +673,16 @@ t_metrics(Config) -> #{?snk_kind := "cluster_link_extrouter_route_deleted"} ), - ?assertMatch( - {200, #{ - <<"metrics">> := #{<<"routes">> := 0}, - <<"node_metrics">> := _ - }}, - get_metrics(source, SourceName) + ?retry( + 300, + 10, + ?assertMatch( + {200, #{ + <<"metrics">> := #{<<"routes">> := 0}, + <<"node_metrics">> := _ + }}, + get_metrics(source, SourceName) + ) ), %% Enabling again @@ -678,7 +697,7 @@ t_metrics(Config) -> ?assertMatch( {200, #{ - <<"metrics">> := #{<<"routes">> := 1}, + <<"metrics">> := #{<<"routes">> := 2}, <<"node_metrics">> := _ }}, get_metrics(source, SourceName)