feat(cluster link): add metrics

Fixes https://emqx.atlassian.net/browse/EMQX-12627
This commit is contained in:
Thales Macedo Garitezi 2024-07-17 18:01:52 -03:00
parent 07cb147d38
commit 6a5849488c
8 changed files with 450 additions and 29 deletions

View File

@ -17,3 +17,7 @@
%% Fairly compact text encoding. %% Fairly compact text encoding.
-define(SHARED_ROUTE_ID(Topic, Group), <<"$s/", Group/binary, "/", Topic/binary>>). -define(SHARED_ROUTE_ID(Topic, Group), <<"$s/", Group/binary, "/", Topic/binary>>).
-define(PERSISTENT_ROUTE_ID(Topic, ID), <<"$p/", ID/binary, "/", Topic/binary>>). -define(PERSISTENT_ROUTE_ID(Topic, ID), <<"$p/", ID/binary, "/", Topic/binary>>).
-define(METRIC_NAME, cluster_link).
-define(route_metric, 'routes').

View File

@ -26,11 +26,29 @@
on_message_publish/1 on_message_publish/1
]). ]).
%% metrics API
-export([
maybe_create_metrics/1,
drop_metrics/1,
get_metrics/1,
routes_inc/2
]).
-include("emqx_cluster_link.hrl"). -include("emqx_cluster_link.hrl").
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_hooks.hrl"). -include_lib("emqx/include/emqx_hooks.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
%%--------------------------------------------------------------------
%% Type definitions
%%--------------------------------------------------------------------
-define(METRICS, [
?route_metric
]).
-define(RATE_METRICS, []).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% emqx_external_broker API %% emqx_external_broker API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -132,6 +150,32 @@ put_hook() ->
delete_hook() -> delete_hook() ->
emqx_hooks:del('message.publish', {?MODULE, on_message_publish, []}). emqx_hooks:del('message.publish', {?MODULE, on_message_publish, []}).
%%--------------------------------------------------------------------
%% metrics API
%%--------------------------------------------------------------------
get_metrics(ClusterName) ->
Nodes = emqx:running_nodes(),
Timeout = 15_000,
Results = emqx_metrics_proto_v2:get_metrics(Nodes, ?METRIC_NAME, ClusterName, Timeout),
sequence_multicall_results(Nodes, Results).
maybe_create_metrics(ClusterName) ->
case emqx_metrics_worker:has_metrics(?METRIC_NAME, ClusterName) of
true ->
ok = emqx_metrics_worker:reset_metrics(?METRIC_NAME, ClusterName);
false ->
ok = emqx_metrics_worker:create_metrics(
?METRIC_NAME, ClusterName, ?METRICS, ?RATE_METRICS
)
end.
drop_metrics(ClusterName) ->
ok = emqx_metrics_worker:clear_metrics(?METRIC_NAME, ClusterName).
routes_inc(ClusterName, Val) ->
catch emqx_metrics_worker:inc(?METRIC_NAME, ClusterName, ?route_metric, Val).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -253,3 +297,16 @@ maybe_filter_incomming_msg(#message{topic = T} = Msg, ClusterName) ->
true -> with_sender_name(Msg, ClusterName); true -> with_sender_name(Msg, ClusterName);
false -> [] false -> []
end. end.
-spec sequence_multicall_results([node()], emqx_rpc:erpc_multicall(term())) ->
{ok, [{node(), term()}]} | {error, [term()]}.
sequence_multicall_results(Nodes, Results) ->
case lists:partition(fun is_ok/1, lists:zip(Nodes, Results)) of
{OkResults, []} ->
{ok, [{Node, Res} || {Node, {ok, Res}} <- OkResults]};
{_OkResults, BadResults} ->
{error, BadResults}
end.
is_ok({_Node, {ok, _}}) -> true;
is_ok(_) -> false.

View File

@ -10,6 +10,7 @@
-include_lib("emqx_utils/include/emqx_utils_api.hrl"). -include_lib("emqx_utils/include/emqx_utils_api.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include("emqx_cluster_link.hrl").
-export([ -export([
api_spec/0, api_spec/0,
@ -21,7 +22,8 @@
-export([ -export([
'/cluster/links'/2, '/cluster/links'/2,
'/cluster/links/:name'/2 '/cluster/links/:name'/2,
'/cluster/links/:name/metrics'/2
]). ]).
-define(CONF_PATH, [cluster, links]). -define(CONF_PATH, [cluster, links]).
@ -37,7 +39,8 @@ api_spec() ->
paths() -> paths() ->
[ [
"/cluster/links", "/cluster/links",
"/cluster/links/:name" "/cluster/links/:name",
"/cluster/links/:name/metrics"
]. ].
schema("/cluster/links") -> schema("/cluster/links") ->
@ -113,6 +116,23 @@ schema("/cluster/links/:name") ->
) )
} }
} }
};
schema("/cluster/links/:name/metrics") ->
#{
'operationId' => '/cluster/links/:name/metrics',
get =>
#{
description => "Get a cluster link metrics",
tags => ?TAGS,
parameters => [param_path_name()],
responses =>
#{
200 => link_metrics_schema_response(),
404 => emqx_dashboard_swagger:error_codes(
[?NOT_FOUND], <<"Cluster link not found">>
)
}
}
}. }.
fields(link_config_response) -> fields(link_config_response) ->
@ -120,6 +140,24 @@ fields(link_config_response) ->
{node, hoconsc:mk(binary(), #{desc => ?DESC("node")})}, {node, hoconsc:mk(binary(), #{desc => ?DESC("node")})},
{status, hoconsc:mk(status(), #{desc => ?DESC("status")})} {status, hoconsc:mk(status(), #{desc => ?DESC("status")})}
| emqx_cluster_link_schema:fields("link") | emqx_cluster_link_schema:fields("link")
];
fields(metrics) ->
[
{metrics, hoconsc:mk(map(), #{desc => ?DESC("metrics")})}
];
fields(link_metrics_response) ->
[
{node_metrics,
hoconsc:mk(
hoconsc:array(hoconsc:ref(?MODULE, node_metrics)),
#{desc => ?DESC("node_metrics")}
)}
| fields(metrics)
];
fields(node_metrics) ->
[
{node, hoconsc:mk(atom(), #{desc => ?DESC("node")})}
| fields(metrics)
]. ].
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -154,6 +192,9 @@ fields(link_config_response) ->
not_found() not_found()
). ).
'/cluster/links/:name/metrics'(get, #{bindings := #{name := Name}}) ->
with_link(Name, fun() -> handle_metrics(Name) end, not_found()).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal funcs %% Internal funcs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -185,6 +226,46 @@ handle_create(Name, Params) ->
handle_lookup(Name, Link) -> handle_lookup(Name, Link) ->
?OK(add_status(Name, Link)). ?OK(add_status(Name, Link)).
handle_metrics(Name) ->
case emqx_cluster_link:get_metrics(Name) of
{error, BadResults} ->
?SLOG(warning, #{
msg => "cluster_link_api_metrics_bad_erpc_results",
results => BadResults
}),
?OK(#{metrics => #{}, node_metrics => []});
{ok, NodeResults} ->
NodeMetrics =
lists:map(
fun({Node, Metrics}) ->
format_metrics(Node, Metrics)
end,
NodeResults
),
AggregatedMetrics = aggregate_metrics(NodeMetrics),
Response = #{metrics => AggregatedMetrics, node_metrics => NodeMetrics},
?OK(Response)
end.
aggregate_metrics(NodeMetrics) ->
ErrorLogger = fun(_) -> ok end,
lists:foldl(
fun(#{metrics := Metrics}, Acc) ->
emqx_utils_maps:best_effort_recursive_sum(Metrics, Acc, ErrorLogger)
end,
#{},
NodeMetrics
).
format_metrics(Node, Metrics) ->
Routes = emqx_utils_maps:deep_get([counters, ?route_metric], Metrics, 0),
#{
node => Node,
metrics => #{
?route_metric => Routes
}
}.
add_status(Name, Link) -> add_status(Name, Link) ->
NodeResults = get_link_status_cluster(Name), NodeResults = get_link_status_cluster(Name),
Status = collect_single_status(NodeResults), Status = collect_single_status(NodeResults),
@ -330,6 +411,16 @@ link_config_schema_response() ->
} }
). ).
link_metrics_schema_response() ->
hoconsc:mk(
hoconsc:ref(?MODULE, link_metrics_response),
#{
examples => #{
<<"example">> => link_metrics_response_example()
}
}
).
status() -> status() ->
hoconsc:enum([?status_connected, ?status_disconnected, ?status_connecting, inconsistent]). hoconsc:enum([?status_connected, ?status_disconnected, ?status_connecting, inconsistent]).
@ -392,6 +483,17 @@ links_config_example() ->
} }
]. ].
link_metrics_response_example() ->
#{
<<"metrics">> => #{<<"routes">> => 10240},
<<"node_metrics">> => [
#{
<<"node">> => <<"emqx1@emqx.net">>,
<<"metrics">> => #{<<"routes">> => 10240}
}
]
}.
with_link(Name, FoundFn, NotFoundFn) -> with_link(Name, FoundFn, NotFoundFn) ->
case emqx_cluster_link_config:link_raw(Name) of case emqx_cluster_link_config:link_raw(Name) of
undefined -> undefined ->

View File

@ -22,7 +22,9 @@ start(_StartType, _StartArgs) ->
_ -> _ ->
ok ok
end, end,
emqx_cluster_link_sup:start_link(LinksConf). {ok, Sup} = emqx_cluster_link_sup:start_link(LinksConf),
ok = create_metrics(LinksConf),
{ok, Sup}.
prep_stop(State) -> prep_stop(State) ->
emqx_cluster_link_config:remove_handler(), emqx_cluster_link_config:remove_handler(),
@ -53,3 +55,11 @@ remove_msg_fwd_resources(LinksConf) ->
end, end,
LinksConf LinksConf
). ).
create_metrics(LinksConf) ->
lists:foreach(
fun(#{name := ClusterName}) ->
ok = emqx_cluster_link:maybe_create_metrics(ClusterName)
end,
LinksConf
).

View File

@ -277,9 +277,10 @@ all_ok(Results) ->
add_links(LinksConf) -> add_links(LinksConf) ->
[add_link(Link) || Link <- LinksConf]. [add_link(Link) || Link <- LinksConf].
add_link(#{enable := true} = LinkConf) -> add_link(#{name := ClusterName, enable := true} = LinkConf) ->
{ok, _Pid} = emqx_cluster_link_sup:ensure_actor(LinkConf), {ok, _Pid} = emqx_cluster_link_sup:ensure_actor(LinkConf),
{ok, _} = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(LinkConf), {ok, _} = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(LinkConf),
ok = emqx_cluster_link:maybe_create_metrics(ClusterName),
ok; ok;
add_link(_DisabledLinkConf) -> add_link(_DisabledLinkConf) ->
ok. ok.
@ -289,7 +290,8 @@ remove_links(LinksConf) ->
remove_link(Name) -> remove_link(Name) ->
_ = emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name), _ = emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name),
ensure_actor_stopped(Name). _ = ensure_actor_stopped(Name),
emqx_cluster_link:drop_metrics(Name).
update_links(LinksConf) -> update_links(LinksConf) ->
[update_link(Link) || Link <- LinksConf]. [update_link(Link) || Link <- LinksConf].

View File

@ -256,31 +256,35 @@ actor_apply_operation(
apply_actor_operation(ActorID, Incarnation, Entry, OpName, Lane) -> apply_actor_operation(ActorID, Incarnation, Entry, OpName, Lane) ->
_ = assert_current_incarnation(ActorID, Incarnation), _ = assert_current_incarnation(ActorID, Incarnation),
apply_operation(Entry, OpName, Lane). apply_operation(ActorID, Entry, OpName, Lane).
apply_operation(Entry, OpName, Lane) -> apply_operation(ActorID, Entry, OpName, Lane) ->
%% NOTE %% NOTE
%% This is safe sequence of operations only on core nodes. On replicants, %% This is safe sequence of operations only on core nodes. On replicants,
%% `mria:dirty_update_counter/3` will be replicated asynchronously, which %% `mria:dirty_update_counter/3` will be replicated asynchronously, which
%% means this read can be stale. %% means this read can be stale.
case mnesia:dirty_read(?EXTROUTE_TAB, Entry) of case mnesia:dirty_read(?EXTROUTE_TAB, Entry) of
[#extroute{mcounter = MCounter}] -> [#extroute{mcounter = MCounter}] ->
apply_operation(Entry, MCounter, OpName, Lane); apply_operation(ActorID, Entry, MCounter, OpName, Lane);
[] -> [] ->
apply_operation(Entry, 0, OpName, Lane) apply_operation(ActorID, Entry, 0, OpName, Lane)
end. end.
apply_operation(Entry, MCounter, OpName, Lane) -> apply_operation(ActorID, Entry, MCounter, OpName, Lane) ->
%% NOTE %% NOTE
%% We are relying on the fact that changes to each individual lane of this %% We are relying on the fact that changes to each individual lane of this
%% multi-counter are synchronized. Without this, such counter updates would %% multi-counter are synchronized. Without this, such counter updates would
%% be unsafe. Instead, we would have to use another, more complex approach, %% be unsafe. Instead, we would have to use another, more complex approach,
%% that runs `ets:lookup/2` + `ets:select_replace/2` in a loop until the %% that runs `ets:lookup/2` + `ets:select_replace/2` in a loop until the
%% counter is updated accordingly. %% counter is updated accordingly.
?ACTOR_ID(ClusterName, _Actor) = ActorID,
Marker = 1 bsl Lane, Marker = 1 bsl Lane,
case MCounter band Marker of case MCounter band Marker of
0 when OpName =:= add -> 0 when OpName =:= add ->
mria:dirty_update_counter(?EXTROUTE_TAB, Entry, Marker); Res = mria:dirty_update_counter(?EXTROUTE_TAB, Entry, Marker),
_ = emqx_cluster_link:routes_inc(ClusterName, 1),
?tp("cluster_link_extrouter_route_added", #{}),
Res;
Marker when OpName =:= add -> Marker when OpName =:= add ->
%% Already added. %% Already added.
MCounter; MCounter;
@ -289,6 +293,8 @@ apply_operation(Entry, MCounter, OpName, Lane) ->
0 -> 0 ->
Record = #extroute{entry = Entry, mcounter = 0}, Record = #extroute{entry = Entry, mcounter = 0},
ok = mria:dirty_delete_object(?EXTROUTE_TAB, Record), ok = mria:dirty_delete_object(?EXTROUTE_TAB, Record),
_ = emqx_cluster_link:routes_inc(ClusterName, -1),
?tp("cluster_link_extrouter_route_deleted", #{}),
0; 0;
C -> C ->
C C
@ -362,16 +368,16 @@ clean_incarnation(Rec = #actor{id = {Cluster, Actor}}) ->
mnesia_clean_incarnation(#actor{id = Actor, incarnation = Incarnation, lane = Lane}) -> mnesia_clean_incarnation(#actor{id = Actor, incarnation = Incarnation, lane = Lane}) ->
case mnesia:read(?EXTROUTE_ACTOR_TAB, Actor, write) of case mnesia:read(?EXTROUTE_ACTOR_TAB, Actor, write) of
[#actor{incarnation = Incarnation}] -> [#actor{incarnation = Incarnation}] ->
_ = clean_lane(Lane), _ = clean_lane(Actor, Lane),
mnesia:delete(?EXTROUTE_ACTOR_TAB, Actor, write); mnesia:delete(?EXTROUTE_ACTOR_TAB, Actor, write);
_Renewed -> _Renewed ->
stale stale
end. end.
clean_lane(Lane) -> clean_lane(ActorID, Lane) ->
ets:foldl( ets:foldl(
fun(#extroute{entry = Entry, mcounter = MCounter}, _) -> fun(#extroute{entry = Entry, mcounter = MCounter}, _) ->
apply_operation(Entry, MCounter, delete, Lane) apply_operation(ActorID, Entry, MCounter, delete, Lane)
end, end,
0, 0,
?EXTROUTE_TAB ?EXTROUTE_TAB

View File

@ -6,6 +6,8 @@
-behaviour(supervisor). -behaviour(supervisor).
-include("emqx_cluster_link.hrl").
-export([start_link/1]). -export([start_link/1]).
-export([ -export([
@ -27,12 +29,13 @@ init(LinksConf) ->
intensity => 10, intensity => 10,
period => 5 period => 5
}, },
Metrics = emqx_metrics_worker:child_spec(metrics, ?METRIC_NAME),
ExtrouterGC = extrouter_gc_spec(), ExtrouterGC = extrouter_gc_spec(),
RouteActors = [ RouteActors = [
sup_spec(Name, ?ACTOR_MODULE, [LinkConf]) sup_spec(Name, ?ACTOR_MODULE, [LinkConf])
|| #{name := Name} = LinkConf <- LinksConf || #{name := Name} = LinkConf <- LinksConf
], ],
{ok, {SupFlags, [ExtrouterGC | RouteActors]}}. {ok, {SupFlags, [Metrics, ExtrouterGC | RouteActors]}}.
extrouter_gc_spec() -> extrouter_gc_spec() ->
%% NOTE: This one is currently global, not per-link. %% NOTE: This one is currently global, not per-link.

View File

@ -11,6 +11,8 @@
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-import(emqx_common_test_helpers, [on_exit/1]).
-define(API_PATH, emqx_mgmt_api_test_util:api_path(["cluster", "links"])). -define(API_PATH, emqx_mgmt_api_test_util:api_path(["cluster", "links"])).
-define(CONF_PATH, [cluster, links]). -define(CONF_PATH, [cluster, links]).
@ -44,7 +46,21 @@
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). AllTCs = emqx_common_test_helpers:all(?MODULE),
OtherTCs = AllTCs -- cluster_test_cases(),
[
{group, cluster}
| OtherTCs
].
groups() ->
[{cluster, cluster_test_cases()}].
cluster_test_cases() ->
[
t_status,
t_metrics
].
init_per_suite(Config) -> init_per_suite(Config) ->
%% This is called by emqx_machine in EMQX release %% This is called by emqx_machine in EMQX release
@ -66,30 +82,35 @@ end_per_suite(Config) ->
emqx_config:delete_override_conf_files(), emqx_config:delete_override_conf_files(),
ok. ok.
auth_header() -> init_per_group(cluster = Group, Config) ->
emqx_mgmt_api_test_util:auth_header_().
init_per_testcase(t_status = TestCase, Config) ->
ok = emqx_cth_suite:stop_apps([emqx_dashboard]), ok = emqx_cth_suite:stop_apps([emqx_dashboard]),
SourceClusterSpec = emqx_cluster_link_SUITE:mk_source_cluster(TestCase, Config), SourceClusterSpec = emqx_cluster_link_SUITE:mk_source_cluster(Group, Config),
TargetClusterSpec = emqx_cluster_link_SUITE:mk_target_cluster(TestCase, Config), TargetClusterSpec = emqx_cluster_link_SUITE:mk_target_cluster(Group, Config),
SourceNodes = [SN1 | _] = emqx_cth_cluster:start(SourceClusterSpec), SourceNodes = [SN1 | _] = emqx_cth_cluster:start(SourceClusterSpec),
TargetNodes = emqx_cth_cluster:start(TargetClusterSpec), TargetNodes = [TN1 | _] = emqx_cth_cluster:start(TargetClusterSpec),
emqx_cluster_link_SUITE:start_cluster_link(SourceNodes ++ TargetNodes, Config), emqx_cluster_link_SUITE:start_cluster_link(SourceNodes ++ TargetNodes, Config),
erpc:call(SN1, emqx_cth_suite, start_apps, [ erpc:call(SN1, emqx_cth_suite, start_apps, [
[emqx_management, emqx_mgmt_api_test_util:emqx_dashboard()], [emqx_management, emqx_mgmt_api_test_util:emqx_dashboard()],
#{work_dir => emqx_cth_suite:work_dir(TestCase, Config)} #{work_dir => emqx_cth_suite:work_dir(Group, Config)}
]),
erpc:call(TN1, emqx_cth_suite, start_apps, [
[
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard(
"dashboard.listeners.http { enable = true, bind = 28083 }"
)
],
#{work_dir => emqx_cth_suite:work_dir(Group, Config)}
]), ]),
[ [
{source_nodes, SourceNodes}, {source_nodes, SourceNodes},
{target_nodes, TargetNodes} {target_nodes, TargetNodes}
| Config | Config
]; ];
init_per_testcase(_TC, Config) -> init_per_group(_Group, Config) ->
{ok, _} = emqx_cluster_link_config:update([]),
Config. Config.
end_per_testcase(t_status, Config) -> end_per_group(cluster, Config) ->
SourceNodes = ?config(source_nodes, Config), SourceNodes = ?config(source_nodes, Config),
TargetNodes = ?config(target_nodes, Config), TargetNodes = ?config(target_nodes, Config),
ok = emqx_cth_cluster:stop(SourceNodes), ok = emqx_cth_cluster:stop(SourceNodes),
@ -99,7 +120,20 @@ end_per_testcase(t_status, Config) ->
#{work_dir => emqx_cth_suite:work_dir(Config)} #{work_dir => emqx_cth_suite:work_dir(Config)}
), ),
ok; ok;
end_per_group(_Group, _Config) ->
ok.
auth_header() ->
emqx_mgmt_api_test_util:auth_header_().
init_per_testcase(_TC, Config) ->
{ok, _} = emqx_cluster_link_config:update([]),
snabbkaffe:start_trace(),
Config.
end_per_testcase(_TC, _Config) -> end_per_testcase(_TC, _Config) ->
snabbkaffe:stop(),
emqx_common_test_helpers:call_janitor(),
ok. ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -114,7 +148,11 @@ list() ->
emqx_mgmt_api_test_util:simple_request(get, Path, _Params = ""). emqx_mgmt_api_test_util:simple_request(get, Path, _Params = "").
get_link(Name) -> get_link(Name) ->
Path = emqx_mgmt_api_test_util:api_path([api_root(), Name]), get_link(source, Name).
get_link(SourceOrTargetCluster, Name) ->
Host = host(SourceOrTargetCluster),
Path = emqx_mgmt_api_test_util:api_path(Host, [api_root(), Name]),
emqx_mgmt_api_test_util:simple_request(get, Path, _Params = ""). emqx_mgmt_api_test_util:simple_request(get, Path, _Params = "").
delete_link(Name) -> delete_link(Name) ->
@ -122,7 +160,11 @@ delete_link(Name) ->
emqx_mgmt_api_test_util:simple_request(delete, Path, _Params = ""). emqx_mgmt_api_test_util:simple_request(delete, Path, _Params = "").
update_link(Name, Params) -> update_link(Name, Params) ->
Path = emqx_mgmt_api_test_util:api_path([api_root(), Name]), update_link(source, Name, Params).
update_link(SourceOrTargetCluster, Name, Params) ->
Host = host(SourceOrTargetCluster),
Path = emqx_mgmt_api_test_util:api_path(Host, [api_root(), Name]),
emqx_mgmt_api_test_util:simple_request(put, Path, Params). emqx_mgmt_api_test_util:simple_request(put, Path, Params).
create_link(Name, Params0) -> create_link(Name, Params0) ->
@ -130,6 +172,17 @@ create_link(Name, Params0) ->
Path = emqx_mgmt_api_test_util:api_path([api_root()]), Path = emqx_mgmt_api_test_util:api_path([api_root()]),
emqx_mgmt_api_test_util:simple_request(post, Path, Params). emqx_mgmt_api_test_util:simple_request(post, Path, Params).
get_metrics(Name) ->
get_metrics(source, Name).
get_metrics(SourceOrTargetCluster, Name) ->
Host = host(SourceOrTargetCluster),
Path = emqx_mgmt_api_test_util:api_path(Host, [api_root(), Name, "metrics"]),
emqx_mgmt_api_test_util:simple_request(get, Path, _Params = []).
host(source) -> "http://127.0.0.1:18083";
host(target) -> "http://127.0.0.1:28083".
link_params() -> link_params() ->
link_params(_Overrides = #{}). link_params(_Overrides = #{}).
@ -194,6 +247,7 @@ t_crud(_Config) ->
?assertMatch({404, _}, get_link(NameA)), ?assertMatch({404, _}, get_link(NameA)),
?assertMatch({404, _}, delete_link(NameA)), ?assertMatch({404, _}, delete_link(NameA)),
?assertMatch({404, _}, update_link(NameA, link_params())), ?assertMatch({404, _}, update_link(NameA, link_params())),
?assertMatch({404, _}, get_metrics(NameA)),
Params1 = link_params(), Params1 = link_params(),
?assertMatch( ?assertMatch(
@ -223,6 +277,7 @@ t_crud(_Config) ->
}}, }},
get_link(NameA) get_link(NameA)
), ),
?assertMatch({200, _}, get_metrics(NameA)),
Params2 = Params1#{<<"pool_size">> := 2}, Params2 = Params1#{<<"pool_size">> := 2},
?assertMatch( ?assertMatch(
@ -238,6 +293,7 @@ t_crud(_Config) ->
?assertMatch({404, _}, delete_link(NameA)), ?assertMatch({404, _}, delete_link(NameA)),
?assertMatch({404, _}, get_link(NameA)), ?assertMatch({404, _}, get_link(NameA)),
?assertMatch({404, _}, update_link(NameA, Params1)), ?assertMatch({404, _}, update_link(NameA, Params1)),
?assertMatch({404, _}, get_metrics(NameA)),
?assertMatch({200, []}, list()), ?assertMatch({200, []}, list()),
ok. ok.
@ -298,6 +354,7 @@ t_status(Config) ->
[Res1, {ok, {ok, Res2B}} | Rest] [Res1, {ok, {ok, Res2B}} | Rest]
end) end)
end), end),
on_exit(fun() -> catch ?ON(SN1, meck:unload()) end),
?assertMatch( ?assertMatch(
{200, [ {200, [
#{ #{
@ -385,3 +442,183 @@ t_status(Config) ->
), ),
ok. ok.
t_metrics(Config) ->
ct:timetrap({seconds, 10}),
[SN1, SN2] = ?config(source_nodes, Config),
[TN1, TN2] = ?config(target_nodes, Config),
%% N.B. Link names on each cluster, so they are switched.
SourceName = <<"cl.target">>,
TargetName = <<"cl.source">>,
?assertMatch(
{200, #{
<<"metrics">> := #{<<"routes">> := 0},
<<"node_metrics">> := [
#{
<<"node">> := _,
<<"metrics">> := #{<<"routes">> := 0}
},
#{
<<"node">> := _,
<<"metrics">> := #{<<"routes">> := 0}
}
]
}},
get_metrics(source, SourceName)
),
?assertMatch(
{200, #{
<<"metrics">> := #{<<"routes">> := 0},
<<"node_metrics">> := [
#{
<<"node">> := _,
<<"metrics">> := #{<<"routes">> := 0}
},
#{
<<"node">> := _,
<<"metrics">> := #{<<"routes">> := 0}
}
]
}},
get_metrics(target, TargetName)
),
SourceC1 = emqx_cluster_link_SUITE:start_client(<<"sc1">>, SN1),
SourceC2 = emqx_cluster_link_SUITE:start_client(<<"sc2">>, SN2),
{ok, _, _} = emqtt:subscribe(SourceC1, <<"t/sc1">>),
{ok, _, _} = emqtt:subscribe(SourceC2, <<"t/sc2">>),
%% Still no routes, as routes in the source cluster are replicated to the target
%% cluster.
?assertMatch(
{200, #{
<<"metrics">> := #{<<"routes">> := 0},
<<"node_metrics">> := [
#{
<<"node">> := _,
<<"metrics">> := #{<<"routes">> := 0}
},
#{
<<"node">> := _,
<<"metrics">> := #{<<"routes">> := 0}
}
]
}},
get_metrics(source, SourceName)
),
?assertMatch(
{200, #{
<<"metrics">> := #{<<"routes">> := 0},
<<"node_metrics">> := [
#{
<<"node">> := _,
<<"metrics">> := #{<<"routes">> := 0}
},
#{
<<"node">> := _,
<<"metrics">> := #{<<"routes">> := 0}
}
]
}},
get_metrics(target, TargetName)
),
TargetC1 = emqx_cluster_link_SUITE:start_client(<<"tc1">>, TN1),
TargetC2 = emqx_cluster_link_SUITE:start_client(<<"tc2">>, TN2),
{ok, _, _} = emqtt:subscribe(TargetC1, <<"t/tc1">>),
{ok, _, _} = emqtt:subscribe(TargetC2, <<"t/tc2">>),
{_, {ok, _}} =
?wait_async_action(
begin
{ok, _, _} = emqtt:subscribe(TargetC1, <<"t/tc1">>),
{ok, _, _} = emqtt:subscribe(TargetC2, <<"t/tc2">>)
end,
#{?snk_kind := clink_route_sync_complete}
),
%% Routes = 2 in source cluster, because the target cluster has some topic filters
%% configured and subscribers to them, which were replicated to the source cluster.
?assertMatch(
{200, #{
<<"metrics">> := #{<<"routes">> := 2},
<<"node_metrics">> := _
}},
get_metrics(source, SourceName)
),
?assertMatch(
{200, #{
<<"metrics">> := #{<<"routes">> := 0},
<<"node_metrics">> := _
}},
get_metrics(target, TargetName)
),
%% Unsubscribe and remove route.
ct:pal("unsubscribing"),
{_, {ok, _}} =
?wait_async_action(
begin
{ok, _, _} = emqtt:unsubscribe(TargetC1, <<"t/tc1">>)
end,
#{?snk_kind := clink_route_sync_complete}
),
?assertMatch(
{200, #{
<<"metrics">> := #{<<"routes">> := 1},
<<"node_metrics">> := _
}},
get_metrics(source, SourceName)
),
%% Disabling the link should remove the routes.
ct:pal("disabling"),
{200, TargetLink0} = get_link(target, TargetName),
TargetLink1 = maps:without([<<"status">>, <<"node_status">>], TargetLink0),
TargetLink2 = TargetLink1#{<<"enable">> := false},
{_, {ok, _}} =
?wait_async_action(
begin
{200, _} = update_link(target, TargetName, TargetLink2),
%% Note that only when the GC runs and collects the stopped actor it'll actually
%% remove the routes
NowMS = erlang:system_time(millisecond),
TTL = emqx_cluster_link_config:actor_ttl(),
ct:pal("gc"),
%% 2 Actors: one for normal routes, one for PS routes
1 = ?ON(SN1, emqx_cluster_link_extrouter:actor_gc(#{timestamp => NowMS + TTL * 3})),
1 = ?ON(SN1, emqx_cluster_link_extrouter:actor_gc(#{timestamp => NowMS + TTL * 3})),
ct:pal("gc done"),
ok
end,
#{?snk_kind := "cluster_link_extrouter_route_deleted"}
),
?assertMatch(
{200, #{
<<"metrics">> := #{<<"routes">> := 0},
<<"node_metrics">> := _
}},
get_metrics(source, SourceName)
),
%% Enabling again
TargetLink3 = TargetLink2#{<<"enable">> := true},
{_, {ok, _}} =
?wait_async_action(
begin
{200, _} = update_link(target, TargetName, TargetLink3)
end,
#{?snk_kind := "cluster_link_extrouter_route_added"}
),
?assertMatch(
{200, #{
<<"metrics">> := #{<<"routes">> := 1},
<<"node_metrics">> := _
}},
get_metrics(source, SourceName)
),
ok.