diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link.erl b/apps/emqx_cluster_link/src/emqx_cluster_link.erl index e3bc04a29..76228c052 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link.erl @@ -26,29 +26,11 @@ on_message_publish/1 ]). -%% metrics API --export([ - maybe_create_metrics/1, - drop_metrics/1, - - get_metrics/1, - routes_inc/2 -]). - -include("emqx_cluster_link.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). -include_lib("emqx/include/logger.hrl"). -%%-------------------------------------------------------------------- -%% Type definitions -%%-------------------------------------------------------------------- - --define(METRICS, [ - ?route_metric -]). --define(RATE_METRICS, []). - %%-------------------------------------------------------------------- %% emqx_external_broker API %%-------------------------------------------------------------------- @@ -150,32 +132,6 @@ put_hook() -> delete_hook() -> emqx_hooks:del('message.publish', {?MODULE, on_message_publish, []}). -%%-------------------------------------------------------------------- -%% metrics API -%%-------------------------------------------------------------------- - -get_metrics(ClusterName) -> - Nodes = emqx:running_nodes(), - Timeout = 15_000, - Results = emqx_metrics_proto_v2:get_metrics(Nodes, ?METRIC_NAME, ClusterName, Timeout), - sequence_multicall_results(Nodes, Results). - -maybe_create_metrics(ClusterName) -> - case emqx_metrics_worker:has_metrics(?METRIC_NAME, ClusterName) of - true -> - ok = emqx_metrics_worker:reset_metrics(?METRIC_NAME, ClusterName); - false -> - ok = emqx_metrics_worker:create_metrics( - ?METRIC_NAME, ClusterName, ?METRICS, ?RATE_METRICS - ) - end. - -drop_metrics(ClusterName) -> - ok = emqx_metrics_worker:clear_metrics(?METRIC_NAME, ClusterName). - -routes_inc(ClusterName, Val) -> - catch emqx_metrics_worker:inc(?METRIC_NAME, ClusterName, ?route_metric, Val). - %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- @@ -297,16 +253,3 @@ maybe_filter_incomming_msg(#message{topic = T} = Msg, ClusterName) -> true -> with_sender_name(Msg, ClusterName); false -> [] end. - --spec sequence_multicall_results([node()], emqx_rpc:erpc_multicall(term())) -> - {ok, [{node(), term()}]} | {error, [term()]}. -sequence_multicall_results(Nodes, Results) -> - case lists:partition(fun is_ok/1, lists:zip(Nodes, Results)) of - {OkResults, []} -> - {ok, [{Node, Res} || {Node, {ok, Res}} <- OkResults]}; - {_OkResults, BadResults} -> - {error, BadResults} - end. - -is_ok({_Node, {ok, _}}) -> true; -is_ok(_) -> false. diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl index 7e70a9ccc..0d748a267 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl @@ -227,7 +227,7 @@ handle_lookup(Name, Link) -> ?OK(add_status(Name, Link)). handle_metrics(Name) -> - case emqx_cluster_link:get_metrics(Name) of + case emqx_cluster_link_metrics:get_metrics(Name) of {error, BadResults} -> ?SLOG(warning, #{ msg => "cluster_link_api_metrics_bad_erpc_results", diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_app.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_app.erl index f9625fae4..9502ad1c3 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_app.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_app.erl @@ -59,7 +59,7 @@ remove_msg_fwd_resources(LinksConf) -> create_metrics(LinksConf) -> lists:foreach( fun(#{name := ClusterName}) -> - ok = emqx_cluster_link:maybe_create_metrics(ClusterName) + ok = emqx_cluster_link_metrics:maybe_create_metrics(ClusterName) end, LinksConf ). diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl index 0455ab21c..5e257a247 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl @@ -280,7 +280,7 @@ add_links(LinksConf) -> add_link(#{name := ClusterName, enable := true} = LinkConf) -> {ok, _Pid} = emqx_cluster_link_sup:ensure_actor(LinkConf), {ok, _} = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(LinkConf), - ok = emqx_cluster_link:maybe_create_metrics(ClusterName), + ok = emqx_cluster_link_metrics:maybe_create_metrics(ClusterName), ok; add_link(_DisabledLinkConf) -> ok. @@ -291,7 +291,7 @@ remove_links(LinksConf) -> remove_link(Name) -> _ = emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name), _ = ensure_actor_stopped(Name), - emqx_cluster_link:drop_metrics(Name). + emqx_cluster_link_metrics:drop_metrics(Name). update_links(LinksConf) -> [update_link(Link) || Link <- LinksConf]. diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl index c45b12ae0..3e2ff1804 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl @@ -282,7 +282,7 @@ apply_operation(ActorID, Entry, MCounter, OpName, Lane) -> case MCounter band Marker of 0 when OpName =:= add -> Res = mria:dirty_update_counter(?EXTROUTE_TAB, Entry, Marker), - _ = emqx_cluster_link:routes_inc(ClusterName, 1), + _ = emqx_cluster_link_metrics:routes_inc(ClusterName, 1), ?tp("cluster_link_extrouter_route_added", #{}), Res; Marker when OpName =:= add -> @@ -293,7 +293,7 @@ apply_operation(ActorID, Entry, MCounter, OpName, Lane) -> 0 -> Record = #extroute{entry = Entry, mcounter = 0}, ok = mria:dirty_delete_object(?EXTROUTE_TAB, Record), - _ = emqx_cluster_link:routes_inc(ClusterName, -1), + _ = emqx_cluster_link_metrics:routes_inc(ClusterName, -1), ?tp("cluster_link_extrouter_route_deleted", #{}), 0; C -> diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_metrics.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_metrics.erl new file mode 100644 index 000000000..3d6f1edc8 --- /dev/null +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_metrics.erl @@ -0,0 +1,67 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_cluster_link_metrics). + +-include("emqx_cluster_link.hrl"). + +%% API +-export([ + maybe_create_metrics/1, + drop_metrics/1, + + get_metrics/1, + routes_inc/2 +]). + +%%-------------------------------------------------------------------- +%% Type definitions +%%-------------------------------------------------------------------- + +-define(METRICS, [ + ?route_metric +]). +-define(RATE_METRICS, []). + +%%-------------------------------------------------------------------- +%% metrics API +%%-------------------------------------------------------------------- + +get_metrics(ClusterName) -> + Nodes = emqx:running_nodes(), + Timeout = 15_000, + Results = emqx_metrics_proto_v2:get_metrics(Nodes, ?METRIC_NAME, ClusterName, Timeout), + sequence_multicall_results(Nodes, Results). + +maybe_create_metrics(ClusterName) -> + case emqx_metrics_worker:has_metrics(?METRIC_NAME, ClusterName) of + true -> + ok = emqx_metrics_worker:reset_metrics(?METRIC_NAME, ClusterName); + false -> + ok = emqx_metrics_worker:create_metrics( + ?METRIC_NAME, ClusterName, ?METRICS, ?RATE_METRICS + ) + end. + +drop_metrics(ClusterName) -> + ok = emqx_metrics_worker:clear_metrics(?METRIC_NAME, ClusterName). + +routes_inc(ClusterName, Val) -> + catch emqx_metrics_worker:inc(?METRIC_NAME, ClusterName, ?route_metric, Val). + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +-spec sequence_multicall_results([node()], emqx_rpc:erpc_multicall(term())) -> + {ok, [{node(), term()}]} | {error, [term()]}. +sequence_multicall_results(Nodes, Results) -> + case lists:partition(fun is_ok/1, lists:zip(Nodes, Results)) of + {OkResults, []} -> + {ok, [{Node, Res} || {Node, {ok, Res}} <- OkResults]}; + {_OkResults, BadResults} -> + {error, BadResults} + end. + +is_ok({_Node, {ok, _}}) -> true; +is_ok(_) -> false.