From 6a5849488c767a567ba39a6ec785b728948c8164 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 17 Jul 2024 18:01:52 -0300 Subject: [PATCH] feat(cluster link): add metrics Fixes https://emqx.atlassian.net/browse/EMQX-12627 --- .../include/emqx_cluster_link.hrl | 4 + .../src/emqx_cluster_link.erl | 57 ++++ .../src/emqx_cluster_link_api.erl | 106 ++++++- .../src/emqx_cluster_link_app.erl | 12 +- .../src/emqx_cluster_link_config.erl | 6 +- .../src/emqx_cluster_link_extrouter.erl | 24 +- .../src/emqx_cluster_link_sup.erl | 5 +- .../test/emqx_cluster_link_api_SUITE.erl | 265 +++++++++++++++++- 8 files changed, 450 insertions(+), 29 deletions(-) diff --git a/apps/emqx_cluster_link/include/emqx_cluster_link.hrl b/apps/emqx_cluster_link/include/emqx_cluster_link.hrl index 08dc7f4ad..32c675d8d 100644 --- a/apps/emqx_cluster_link/include/emqx_cluster_link.hrl +++ b/apps/emqx_cluster_link/include/emqx_cluster_link.hrl @@ -17,3 +17,7 @@ %% Fairly compact text encoding. -define(SHARED_ROUTE_ID(Topic, Group), <<"$s/", Group/binary, "/", Topic/binary>>). -define(PERSISTENT_ROUTE_ID(Topic, ID), <<"$p/", ID/binary, "/", Topic/binary>>). + +-define(METRIC_NAME, cluster_link). + +-define(route_metric, 'routes'). diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link.erl b/apps/emqx_cluster_link/src/emqx_cluster_link.erl index 76228c052..e3bc04a29 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link.erl @@ -26,11 +26,29 @@ on_message_publish/1 ]). +%% metrics API +-export([ + maybe_create_metrics/1, + drop_metrics/1, + + get_metrics/1, + routes_inc/2 +]). + -include("emqx_cluster_link.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). -include_lib("emqx/include/logger.hrl"). +%%-------------------------------------------------------------------- +%% Type definitions +%%-------------------------------------------------------------------- + +-define(METRICS, [ + ?route_metric +]). +-define(RATE_METRICS, []). + %%-------------------------------------------------------------------- %% emqx_external_broker API %%-------------------------------------------------------------------- @@ -132,6 +150,32 @@ put_hook() -> delete_hook() -> emqx_hooks:del('message.publish', {?MODULE, on_message_publish, []}). +%%-------------------------------------------------------------------- +%% metrics API +%%-------------------------------------------------------------------- + +get_metrics(ClusterName) -> + Nodes = emqx:running_nodes(), + Timeout = 15_000, + Results = emqx_metrics_proto_v2:get_metrics(Nodes, ?METRIC_NAME, ClusterName, Timeout), + sequence_multicall_results(Nodes, Results). + +maybe_create_metrics(ClusterName) -> + case emqx_metrics_worker:has_metrics(?METRIC_NAME, ClusterName) of + true -> + ok = emqx_metrics_worker:reset_metrics(?METRIC_NAME, ClusterName); + false -> + ok = emqx_metrics_worker:create_metrics( + ?METRIC_NAME, ClusterName, ?METRICS, ?RATE_METRICS + ) + end. + +drop_metrics(ClusterName) -> + ok = emqx_metrics_worker:clear_metrics(?METRIC_NAME, ClusterName). + +routes_inc(ClusterName, Val) -> + catch emqx_metrics_worker:inc(?METRIC_NAME, ClusterName, ?route_metric, Val). + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- @@ -253,3 +297,16 @@ maybe_filter_incomming_msg(#message{topic = T} = Msg, ClusterName) -> true -> with_sender_name(Msg, ClusterName); false -> [] end. + +-spec sequence_multicall_results([node()], emqx_rpc:erpc_multicall(term())) -> + {ok, [{node(), term()}]} | {error, [term()]}. +sequence_multicall_results(Nodes, Results) -> + case lists:partition(fun is_ok/1, lists:zip(Nodes, Results)) of + {OkResults, []} -> + {ok, [{Node, Res} || {Node, {ok, Res}} <- OkResults]}; + {_OkResults, BadResults} -> + {error, BadResults} + end. + +is_ok({_Node, {ok, _}}) -> true; +is_ok(_) -> false. diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl index a184ab36d..77a77610b 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl @@ -10,6 +10,7 @@ -include_lib("emqx_utils/include/emqx_utils_api.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("emqx/include/logger.hrl"). +-include("emqx_cluster_link.hrl"). -export([ api_spec/0, @@ -21,7 +22,8 @@ -export([ '/cluster/links'/2, - '/cluster/links/:name'/2 + '/cluster/links/:name'/2, + '/cluster/links/:name/metrics'/2 ]). -define(CONF_PATH, [cluster, links]). @@ -37,7 +39,8 @@ api_spec() -> paths() -> [ "/cluster/links", - "/cluster/links/:name" + "/cluster/links/:name", + "/cluster/links/:name/metrics" ]. schema("/cluster/links") -> @@ -113,6 +116,23 @@ schema("/cluster/links/:name") -> ) } } + }; +schema("/cluster/links/:name/metrics") -> + #{ + 'operationId' => '/cluster/links/:name/metrics', + get => + #{ + description => "Get a cluster link metrics", + tags => ?TAGS, + parameters => [param_path_name()], + responses => + #{ + 200 => link_metrics_schema_response(), + 404 => emqx_dashboard_swagger:error_codes( + [?NOT_FOUND], <<"Cluster link not found">> + ) + } + } }. fields(link_config_response) -> @@ -120,6 +140,24 @@ fields(link_config_response) -> {node, hoconsc:mk(binary(), #{desc => ?DESC("node")})}, {status, hoconsc:mk(status(), #{desc => ?DESC("status")})} | emqx_cluster_link_schema:fields("link") + ]; +fields(metrics) -> + [ + {metrics, hoconsc:mk(map(), #{desc => ?DESC("metrics")})} + ]; +fields(link_metrics_response) -> + [ + {node_metrics, + hoconsc:mk( + hoconsc:array(hoconsc:ref(?MODULE, node_metrics)), + #{desc => ?DESC("node_metrics")} + )} + | fields(metrics) + ]; +fields(node_metrics) -> + [ + {node, hoconsc:mk(atom(), #{desc => ?DESC("node")})} + | fields(metrics) ]. %%-------------------------------------------------------------------- @@ -154,6 +192,9 @@ fields(link_config_response) -> not_found() ). +'/cluster/links/:name/metrics'(get, #{bindings := #{name := Name}}) -> + with_link(Name, fun() -> handle_metrics(Name) end, not_found()). + %%-------------------------------------------------------------------- %% Internal funcs %%-------------------------------------------------------------------- @@ -185,6 +226,46 @@ handle_create(Name, Params) -> handle_lookup(Name, Link) -> ?OK(add_status(Name, Link)). +handle_metrics(Name) -> + case emqx_cluster_link:get_metrics(Name) of + {error, BadResults} -> + ?SLOG(warning, #{ + msg => "cluster_link_api_metrics_bad_erpc_results", + results => BadResults + }), + ?OK(#{metrics => #{}, node_metrics => []}); + {ok, NodeResults} -> + NodeMetrics = + lists:map( + fun({Node, Metrics}) -> + format_metrics(Node, Metrics) + end, + NodeResults + ), + AggregatedMetrics = aggregate_metrics(NodeMetrics), + Response = #{metrics => AggregatedMetrics, node_metrics => NodeMetrics}, + ?OK(Response) + end. + +aggregate_metrics(NodeMetrics) -> + ErrorLogger = fun(_) -> ok end, + lists:foldl( + fun(#{metrics := Metrics}, Acc) -> + emqx_utils_maps:best_effort_recursive_sum(Metrics, Acc, ErrorLogger) + end, + #{}, + NodeMetrics + ). + +format_metrics(Node, Metrics) -> + Routes = emqx_utils_maps:deep_get([counters, ?route_metric], Metrics, 0), + #{ + node => Node, + metrics => #{ + ?route_metric => Routes + } + }. + add_status(Name, Link) -> NodeResults = get_link_status_cluster(Name), Status = collect_single_status(NodeResults), @@ -330,6 +411,16 @@ link_config_schema_response() -> } ). +link_metrics_schema_response() -> + hoconsc:mk( + hoconsc:ref(?MODULE, link_metrics_response), + #{ + examples => #{ + <<"example">> => link_metrics_response_example() + } + } + ). + status() -> hoconsc:enum([?status_connected, ?status_disconnected, ?status_connecting, inconsistent]). @@ -392,6 +483,17 @@ links_config_example() -> } ]. +link_metrics_response_example() -> + #{ + <<"metrics">> => #{<<"routes">> => 10240}, + <<"node_metrics">> => [ + #{ + <<"node">> => <<"emqx1@emqx.net">>, + <<"metrics">> => #{<<"routes">> => 10240} + } + ] + }. + with_link(Name, FoundFn, NotFoundFn) -> case emqx_cluster_link_config:link_raw(Name) of undefined -> diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_app.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_app.erl index 41f1a0a77..f9625fae4 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_app.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_app.erl @@ -22,7 +22,9 @@ start(_StartType, _StartArgs) -> _ -> ok end, - emqx_cluster_link_sup:start_link(LinksConf). + {ok, Sup} = emqx_cluster_link_sup:start_link(LinksConf), + ok = create_metrics(LinksConf), + {ok, Sup}. prep_stop(State) -> emqx_cluster_link_config:remove_handler(), @@ -53,3 +55,11 @@ remove_msg_fwd_resources(LinksConf) -> end, LinksConf ). + +create_metrics(LinksConf) -> + lists:foreach( + fun(#{name := ClusterName}) -> + ok = emqx_cluster_link:maybe_create_metrics(ClusterName) + end, + LinksConf + ). diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl index 36655460b..0455ab21c 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl @@ -277,9 +277,10 @@ all_ok(Results) -> add_links(LinksConf) -> [add_link(Link) || Link <- LinksConf]. -add_link(#{enable := true} = LinkConf) -> +add_link(#{name := ClusterName, enable := true} = LinkConf) -> {ok, _Pid} = emqx_cluster_link_sup:ensure_actor(LinkConf), {ok, _} = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(LinkConf), + ok = emqx_cluster_link:maybe_create_metrics(ClusterName), ok; add_link(_DisabledLinkConf) -> ok. @@ -289,7 +290,8 @@ remove_links(LinksConf) -> remove_link(Name) -> _ = emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name), - ensure_actor_stopped(Name). + _ = ensure_actor_stopped(Name), + emqx_cluster_link:drop_metrics(Name). update_links(LinksConf) -> [update_link(Link) || Link <- LinksConf]. diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl index 79d96e207..c45b12ae0 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl @@ -256,31 +256,35 @@ actor_apply_operation( apply_actor_operation(ActorID, Incarnation, Entry, OpName, Lane) -> _ = assert_current_incarnation(ActorID, Incarnation), - apply_operation(Entry, OpName, Lane). + apply_operation(ActorID, Entry, OpName, Lane). -apply_operation(Entry, OpName, Lane) -> +apply_operation(ActorID, Entry, OpName, Lane) -> %% NOTE %% This is safe sequence of operations only on core nodes. On replicants, %% `mria:dirty_update_counter/3` will be replicated asynchronously, which %% means this read can be stale. case mnesia:dirty_read(?EXTROUTE_TAB, Entry) of [#extroute{mcounter = MCounter}] -> - apply_operation(Entry, MCounter, OpName, Lane); + apply_operation(ActorID, Entry, MCounter, OpName, Lane); [] -> - apply_operation(Entry, 0, OpName, Lane) + apply_operation(ActorID, Entry, 0, OpName, Lane) end. -apply_operation(Entry, MCounter, OpName, Lane) -> +apply_operation(ActorID, Entry, MCounter, OpName, Lane) -> %% NOTE %% We are relying on the fact that changes to each individual lane of this %% multi-counter are synchronized. Without this, such counter updates would %% be unsafe. Instead, we would have to use another, more complex approach, %% that runs `ets:lookup/2` + `ets:select_replace/2` in a loop until the %% counter is updated accordingly. + ?ACTOR_ID(ClusterName, _Actor) = ActorID, Marker = 1 bsl Lane, case MCounter band Marker of 0 when OpName =:= add -> - mria:dirty_update_counter(?EXTROUTE_TAB, Entry, Marker); + Res = mria:dirty_update_counter(?EXTROUTE_TAB, Entry, Marker), + _ = emqx_cluster_link:routes_inc(ClusterName, 1), + ?tp("cluster_link_extrouter_route_added", #{}), + Res; Marker when OpName =:= add -> %% Already added. MCounter; @@ -289,6 +293,8 @@ apply_operation(Entry, MCounter, OpName, Lane) -> 0 -> Record = #extroute{entry = Entry, mcounter = 0}, ok = mria:dirty_delete_object(?EXTROUTE_TAB, Record), + _ = emqx_cluster_link:routes_inc(ClusterName, -1), + ?tp("cluster_link_extrouter_route_deleted", #{}), 0; C -> C @@ -362,16 +368,16 @@ clean_incarnation(Rec = #actor{id = {Cluster, Actor}}) -> mnesia_clean_incarnation(#actor{id = Actor, incarnation = Incarnation, lane = Lane}) -> case mnesia:read(?EXTROUTE_ACTOR_TAB, Actor, write) of [#actor{incarnation = Incarnation}] -> - _ = clean_lane(Lane), + _ = clean_lane(Actor, Lane), mnesia:delete(?EXTROUTE_ACTOR_TAB, Actor, write); _Renewed -> stale end. -clean_lane(Lane) -> +clean_lane(ActorID, Lane) -> ets:foldl( fun(#extroute{entry = Entry, mcounter = MCounter}, _) -> - apply_operation(Entry, MCounter, delete, Lane) + apply_operation(ActorID, Entry, MCounter, delete, Lane) end, 0, ?EXTROUTE_TAB diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl index 2025510fc..81b5afb4c 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl @@ -6,6 +6,8 @@ -behaviour(supervisor). +-include("emqx_cluster_link.hrl"). + -export([start_link/1]). -export([ @@ -27,12 +29,13 @@ init(LinksConf) -> intensity => 10, period => 5 }, + Metrics = emqx_metrics_worker:child_spec(metrics, ?METRIC_NAME), ExtrouterGC = extrouter_gc_spec(), RouteActors = [ sup_spec(Name, ?ACTOR_MODULE, [LinkConf]) || #{name := Name} = LinkConf <- LinksConf ], - {ok, {SupFlags, [ExtrouterGC | RouteActors]}}. + {ok, {SupFlags, [Metrics, ExtrouterGC | RouteActors]}}. extrouter_gc_spec() -> %% NOTE: This one is currently global, not per-link. diff --git a/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl b/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl index 535e8521a..f4a15a62a 100644 --- a/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl +++ b/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl @@ -11,6 +11,8 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-import(emqx_common_test_helpers, [on_exit/1]). + -define(API_PATH, emqx_mgmt_api_test_util:api_path(["cluster", "links"])). -define(CONF_PATH, [cluster, links]). @@ -44,7 +46,21 @@ %%------------------------------------------------------------------------------ all() -> - emqx_common_test_helpers:all(?MODULE). + AllTCs = emqx_common_test_helpers:all(?MODULE), + OtherTCs = AllTCs -- cluster_test_cases(), + [ + {group, cluster} + | OtherTCs + ]. + +groups() -> + [{cluster, cluster_test_cases()}]. + +cluster_test_cases() -> + [ + t_status, + t_metrics + ]. init_per_suite(Config) -> %% This is called by emqx_machine in EMQX release @@ -66,30 +82,35 @@ end_per_suite(Config) -> emqx_config:delete_override_conf_files(), ok. -auth_header() -> - emqx_mgmt_api_test_util:auth_header_(). - -init_per_testcase(t_status = TestCase, Config) -> +init_per_group(cluster = Group, Config) -> ok = emqx_cth_suite:stop_apps([emqx_dashboard]), - SourceClusterSpec = emqx_cluster_link_SUITE:mk_source_cluster(TestCase, Config), - TargetClusterSpec = emqx_cluster_link_SUITE:mk_target_cluster(TestCase, Config), + SourceClusterSpec = emqx_cluster_link_SUITE:mk_source_cluster(Group, Config), + TargetClusterSpec = emqx_cluster_link_SUITE:mk_target_cluster(Group, Config), SourceNodes = [SN1 | _] = emqx_cth_cluster:start(SourceClusterSpec), - TargetNodes = emqx_cth_cluster:start(TargetClusterSpec), + TargetNodes = [TN1 | _] = emqx_cth_cluster:start(TargetClusterSpec), emqx_cluster_link_SUITE:start_cluster_link(SourceNodes ++ TargetNodes, Config), erpc:call(SN1, emqx_cth_suite, start_apps, [ [emqx_management, emqx_mgmt_api_test_util:emqx_dashboard()], - #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)} + #{work_dir => emqx_cth_suite:work_dir(Group, Config)} + ]), + erpc:call(TN1, emqx_cth_suite, start_apps, [ + [ + emqx_management, + emqx_mgmt_api_test_util:emqx_dashboard( + "dashboard.listeners.http { enable = true, bind = 28083 }" + ) + ], + #{work_dir => emqx_cth_suite:work_dir(Group, Config)} ]), [ {source_nodes, SourceNodes}, {target_nodes, TargetNodes} | Config ]; -init_per_testcase(_TC, Config) -> - {ok, _} = emqx_cluster_link_config:update([]), +init_per_group(_Group, Config) -> Config. -end_per_testcase(t_status, Config) -> +end_per_group(cluster, Config) -> SourceNodes = ?config(source_nodes, Config), TargetNodes = ?config(target_nodes, Config), ok = emqx_cth_cluster:stop(SourceNodes), @@ -99,7 +120,20 @@ end_per_testcase(t_status, Config) -> #{work_dir => emqx_cth_suite:work_dir(Config)} ), ok; +end_per_group(_Group, _Config) -> + ok. + +auth_header() -> + emqx_mgmt_api_test_util:auth_header_(). + +init_per_testcase(_TC, Config) -> + {ok, _} = emqx_cluster_link_config:update([]), + snabbkaffe:start_trace(), + Config. + end_per_testcase(_TC, _Config) -> + snabbkaffe:stop(), + emqx_common_test_helpers:call_janitor(), ok. %%------------------------------------------------------------------------------ @@ -114,7 +148,11 @@ list() -> emqx_mgmt_api_test_util:simple_request(get, Path, _Params = ""). get_link(Name) -> - Path = emqx_mgmt_api_test_util:api_path([api_root(), Name]), + get_link(source, Name). + +get_link(SourceOrTargetCluster, Name) -> + Host = host(SourceOrTargetCluster), + Path = emqx_mgmt_api_test_util:api_path(Host, [api_root(), Name]), emqx_mgmt_api_test_util:simple_request(get, Path, _Params = ""). delete_link(Name) -> @@ -122,7 +160,11 @@ delete_link(Name) -> emqx_mgmt_api_test_util:simple_request(delete, Path, _Params = ""). update_link(Name, Params) -> - Path = emqx_mgmt_api_test_util:api_path([api_root(), Name]), + update_link(source, Name, Params). + +update_link(SourceOrTargetCluster, Name, Params) -> + Host = host(SourceOrTargetCluster), + Path = emqx_mgmt_api_test_util:api_path(Host, [api_root(), Name]), emqx_mgmt_api_test_util:simple_request(put, Path, Params). create_link(Name, Params0) -> @@ -130,6 +172,17 @@ create_link(Name, Params0) -> Path = emqx_mgmt_api_test_util:api_path([api_root()]), emqx_mgmt_api_test_util:simple_request(post, Path, Params). +get_metrics(Name) -> + get_metrics(source, Name). + +get_metrics(SourceOrTargetCluster, Name) -> + Host = host(SourceOrTargetCluster), + Path = emqx_mgmt_api_test_util:api_path(Host, [api_root(), Name, "metrics"]), + emqx_mgmt_api_test_util:simple_request(get, Path, _Params = []). + +host(source) -> "http://127.0.0.1:18083"; +host(target) -> "http://127.0.0.1:28083". + link_params() -> link_params(_Overrides = #{}). @@ -194,6 +247,7 @@ t_crud(_Config) -> ?assertMatch({404, _}, get_link(NameA)), ?assertMatch({404, _}, delete_link(NameA)), ?assertMatch({404, _}, update_link(NameA, link_params())), + ?assertMatch({404, _}, get_metrics(NameA)), Params1 = link_params(), ?assertMatch( @@ -223,6 +277,7 @@ t_crud(_Config) -> }}, get_link(NameA) ), + ?assertMatch({200, _}, get_metrics(NameA)), Params2 = Params1#{<<"pool_size">> := 2}, ?assertMatch( @@ -238,6 +293,7 @@ t_crud(_Config) -> ?assertMatch({404, _}, delete_link(NameA)), ?assertMatch({404, _}, get_link(NameA)), ?assertMatch({404, _}, update_link(NameA, Params1)), + ?assertMatch({404, _}, get_metrics(NameA)), ?assertMatch({200, []}, list()), ok. @@ -298,6 +354,7 @@ t_status(Config) -> [Res1, {ok, {ok, Res2B}} | Rest] end) end), + on_exit(fun() -> catch ?ON(SN1, meck:unload()) end), ?assertMatch( {200, [ #{ @@ -385,3 +442,183 @@ t_status(Config) -> ), ok. + +t_metrics(Config) -> + ct:timetrap({seconds, 10}), + [SN1, SN2] = ?config(source_nodes, Config), + [TN1, TN2] = ?config(target_nodes, Config), + %% N.B. Link names on each cluster, so they are switched. + SourceName = <<"cl.target">>, + TargetName = <<"cl.source">>, + + ?assertMatch( + {200, #{ + <<"metrics">> := #{<<"routes">> := 0}, + <<"node_metrics">> := [ + #{ + <<"node">> := _, + <<"metrics">> := #{<<"routes">> := 0} + }, + #{ + <<"node">> := _, + <<"metrics">> := #{<<"routes">> := 0} + } + ] + }}, + get_metrics(source, SourceName) + ), + ?assertMatch( + {200, #{ + <<"metrics">> := #{<<"routes">> := 0}, + <<"node_metrics">> := [ + #{ + <<"node">> := _, + <<"metrics">> := #{<<"routes">> := 0} + }, + #{ + <<"node">> := _, + <<"metrics">> := #{<<"routes">> := 0} + } + ] + }}, + get_metrics(target, TargetName) + ), + + SourceC1 = emqx_cluster_link_SUITE:start_client(<<"sc1">>, SN1), + SourceC2 = emqx_cluster_link_SUITE:start_client(<<"sc2">>, SN2), + {ok, _, _} = emqtt:subscribe(SourceC1, <<"t/sc1">>), + {ok, _, _} = emqtt:subscribe(SourceC2, <<"t/sc2">>), + + %% Still no routes, as routes in the source cluster are replicated to the target + %% cluster. + ?assertMatch( + {200, #{ + <<"metrics">> := #{<<"routes">> := 0}, + <<"node_metrics">> := [ + #{ + <<"node">> := _, + <<"metrics">> := #{<<"routes">> := 0} + }, + #{ + <<"node">> := _, + <<"metrics">> := #{<<"routes">> := 0} + } + ] + }}, + get_metrics(source, SourceName) + ), + ?assertMatch( + {200, #{ + <<"metrics">> := #{<<"routes">> := 0}, + <<"node_metrics">> := [ + #{ + <<"node">> := _, + <<"metrics">> := #{<<"routes">> := 0} + }, + #{ + <<"node">> := _, + <<"metrics">> := #{<<"routes">> := 0} + } + ] + }}, + get_metrics(target, TargetName) + ), + + TargetC1 = emqx_cluster_link_SUITE:start_client(<<"tc1">>, TN1), + TargetC2 = emqx_cluster_link_SUITE:start_client(<<"tc2">>, TN2), + {ok, _, _} = emqtt:subscribe(TargetC1, <<"t/tc1">>), + {ok, _, _} = emqtt:subscribe(TargetC2, <<"t/tc2">>), + {_, {ok, _}} = + ?wait_async_action( + begin + {ok, _, _} = emqtt:subscribe(TargetC1, <<"t/tc1">>), + {ok, _, _} = emqtt:subscribe(TargetC2, <<"t/tc2">>) + end, + #{?snk_kind := clink_route_sync_complete} + ), + + %% Routes = 2 in source cluster, because the target cluster has some topic filters + %% configured and subscribers to them, which were replicated to the source cluster. + ?assertMatch( + {200, #{ + <<"metrics">> := #{<<"routes">> := 2}, + <<"node_metrics">> := _ + }}, + get_metrics(source, SourceName) + ), + ?assertMatch( + {200, #{ + <<"metrics">> := #{<<"routes">> := 0}, + <<"node_metrics">> := _ + }}, + get_metrics(target, TargetName) + ), + + %% Unsubscribe and remove route. + ct:pal("unsubscribing"), + {_, {ok, _}} = + ?wait_async_action( + begin + {ok, _, _} = emqtt:unsubscribe(TargetC1, <<"t/tc1">>) + end, + #{?snk_kind := clink_route_sync_complete} + ), + + ?assertMatch( + {200, #{ + <<"metrics">> := #{<<"routes">> := 1}, + <<"node_metrics">> := _ + }}, + get_metrics(source, SourceName) + ), + + %% Disabling the link should remove the routes. + ct:pal("disabling"), + {200, TargetLink0} = get_link(target, TargetName), + TargetLink1 = maps:without([<<"status">>, <<"node_status">>], TargetLink0), + TargetLink2 = TargetLink1#{<<"enable">> := false}, + {_, {ok, _}} = + ?wait_async_action( + begin + {200, _} = update_link(target, TargetName, TargetLink2), + %% Note that only when the GC runs and collects the stopped actor it'll actually + %% remove the routes + NowMS = erlang:system_time(millisecond), + TTL = emqx_cluster_link_config:actor_ttl(), + ct:pal("gc"), + %% 2 Actors: one for normal routes, one for PS routes + 1 = ?ON(SN1, emqx_cluster_link_extrouter:actor_gc(#{timestamp => NowMS + TTL * 3})), + 1 = ?ON(SN1, emqx_cluster_link_extrouter:actor_gc(#{timestamp => NowMS + TTL * 3})), + ct:pal("gc done"), + ok + end, + #{?snk_kind := "cluster_link_extrouter_route_deleted"} + ), + + ?assertMatch( + {200, #{ + <<"metrics">> := #{<<"routes">> := 0}, + <<"node_metrics">> := _ + }}, + get_metrics(source, SourceName) + ), + + %% Enabling again + TargetLink3 = TargetLink2#{<<"enable">> := true}, + {_, {ok, _}} = + ?wait_async_action( + begin + {200, _} = update_link(target, TargetName, TargetLink3) + end, + #{?snk_kind := "cluster_link_extrouter_route_added"} + ), + + ?assertMatch( + {200, #{ + <<"metrics">> := #{<<"routes">> := 1}, + <<"node_metrics">> := _ + }}, + get_metrics(source, SourceName) + ), + + ok.