refactor: move metrics logic to separate module
This commit is contained in:
parent
d9832252d8
commit
ca2d4ad2a0
|
@ -26,29 +26,11 @@
|
|||
on_message_publish/1
|
||||
]).
|
||||
|
||||
%% metrics API
|
||||
-export([
|
||||
maybe_create_metrics/1,
|
||||
drop_metrics/1,
|
||||
|
||||
get_metrics/1,
|
||||
routes_inc/2
|
||||
]).
|
||||
|
||||
-include("emqx_cluster_link.hrl").
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("emqx/include/emqx_hooks.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Type definitions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-define(METRICS, [
|
||||
?route_metric
|
||||
]).
|
||||
-define(RATE_METRICS, []).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% emqx_external_broker API
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -150,32 +132,6 @@ put_hook() ->
|
|||
delete_hook() ->
|
||||
emqx_hooks:del('message.publish', {?MODULE, on_message_publish, []}).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% metrics API
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
get_metrics(ClusterName) ->
|
||||
Nodes = emqx:running_nodes(),
|
||||
Timeout = 15_000,
|
||||
Results = emqx_metrics_proto_v2:get_metrics(Nodes, ?METRIC_NAME, ClusterName, Timeout),
|
||||
sequence_multicall_results(Nodes, Results).
|
||||
|
||||
maybe_create_metrics(ClusterName) ->
|
||||
case emqx_metrics_worker:has_metrics(?METRIC_NAME, ClusterName) of
|
||||
true ->
|
||||
ok = emqx_metrics_worker:reset_metrics(?METRIC_NAME, ClusterName);
|
||||
false ->
|
||||
ok = emqx_metrics_worker:create_metrics(
|
||||
?METRIC_NAME, ClusterName, ?METRICS, ?RATE_METRICS
|
||||
)
|
||||
end.
|
||||
|
||||
drop_metrics(ClusterName) ->
|
||||
ok = emqx_metrics_worker:clear_metrics(?METRIC_NAME, ClusterName).
|
||||
|
||||
routes_inc(ClusterName, Val) ->
|
||||
catch emqx_metrics_worker:inc(?METRIC_NAME, ClusterName, ?route_metric, Val).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -297,16 +253,3 @@ maybe_filter_incomming_msg(#message{topic = T} = Msg, ClusterName) ->
|
|||
true -> with_sender_name(Msg, ClusterName);
|
||||
false -> []
|
||||
end.
|
||||
|
||||
-spec sequence_multicall_results([node()], emqx_rpc:erpc_multicall(term())) ->
|
||||
{ok, [{node(), term()}]} | {error, [term()]}.
|
||||
sequence_multicall_results(Nodes, Results) ->
|
||||
case lists:partition(fun is_ok/1, lists:zip(Nodes, Results)) of
|
||||
{OkResults, []} ->
|
||||
{ok, [{Node, Res} || {Node, {ok, Res}} <- OkResults]};
|
||||
{_OkResults, BadResults} ->
|
||||
{error, BadResults}
|
||||
end.
|
||||
|
||||
is_ok({_Node, {ok, _}}) -> true;
|
||||
is_ok(_) -> false.
|
||||
|
|
|
@ -227,7 +227,7 @@ handle_lookup(Name, Link) ->
|
|||
?OK(add_status(Name, Link)).
|
||||
|
||||
handle_metrics(Name) ->
|
||||
case emqx_cluster_link:get_metrics(Name) of
|
||||
case emqx_cluster_link_metrics:get_metrics(Name) of
|
||||
{error, BadResults} ->
|
||||
?SLOG(warning, #{
|
||||
msg => "cluster_link_api_metrics_bad_erpc_results",
|
||||
|
|
|
@ -59,7 +59,7 @@ remove_msg_fwd_resources(LinksConf) ->
|
|||
create_metrics(LinksConf) ->
|
||||
lists:foreach(
|
||||
fun(#{name := ClusterName}) ->
|
||||
ok = emqx_cluster_link:maybe_create_metrics(ClusterName)
|
||||
ok = emqx_cluster_link_metrics:maybe_create_metrics(ClusterName)
|
||||
end,
|
||||
LinksConf
|
||||
).
|
||||
|
|
|
@ -280,7 +280,7 @@ add_links(LinksConf) ->
|
|||
add_link(#{name := ClusterName, enable := true} = LinkConf) ->
|
||||
{ok, _Pid} = emqx_cluster_link_sup:ensure_actor(LinkConf),
|
||||
{ok, _} = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(LinkConf),
|
||||
ok = emqx_cluster_link:maybe_create_metrics(ClusterName),
|
||||
ok = emqx_cluster_link_metrics:maybe_create_metrics(ClusterName),
|
||||
ok;
|
||||
add_link(_DisabledLinkConf) ->
|
||||
ok.
|
||||
|
@ -291,7 +291,7 @@ remove_links(LinksConf) ->
|
|||
remove_link(Name) ->
|
||||
_ = emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name),
|
||||
_ = ensure_actor_stopped(Name),
|
||||
emqx_cluster_link:drop_metrics(Name).
|
||||
emqx_cluster_link_metrics:drop_metrics(Name).
|
||||
|
||||
update_links(LinksConf) ->
|
||||
[update_link(Link) || Link <- LinksConf].
|
||||
|
|
|
@ -282,7 +282,7 @@ apply_operation(ActorID, Entry, MCounter, OpName, Lane) ->
|
|||
case MCounter band Marker of
|
||||
0 when OpName =:= add ->
|
||||
Res = mria:dirty_update_counter(?EXTROUTE_TAB, Entry, Marker),
|
||||
_ = emqx_cluster_link:routes_inc(ClusterName, 1),
|
||||
_ = emqx_cluster_link_metrics:routes_inc(ClusterName, 1),
|
||||
?tp("cluster_link_extrouter_route_added", #{}),
|
||||
Res;
|
||||
Marker when OpName =:= add ->
|
||||
|
@ -293,7 +293,7 @@ apply_operation(ActorID, Entry, MCounter, OpName, Lane) ->
|
|||
0 ->
|
||||
Record = #extroute{entry = Entry, mcounter = 0},
|
||||
ok = mria:dirty_delete_object(?EXTROUTE_TAB, Record),
|
||||
_ = emqx_cluster_link:routes_inc(ClusterName, -1),
|
||||
_ = emqx_cluster_link_metrics:routes_inc(ClusterName, -1),
|
||||
?tp("cluster_link_extrouter_route_deleted", #{}),
|
||||
0;
|
||||
C ->
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_cluster_link_metrics).
|
||||
|
||||
-include("emqx_cluster_link.hrl").
|
||||
|
||||
%% API
|
||||
-export([
|
||||
maybe_create_metrics/1,
|
||||
drop_metrics/1,
|
||||
|
||||
get_metrics/1,
|
||||
routes_inc/2
|
||||
]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Type definitions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-define(METRICS, [
|
||||
?route_metric
|
||||
]).
|
||||
-define(RATE_METRICS, []).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% metrics API
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
get_metrics(ClusterName) ->
|
||||
Nodes = emqx:running_nodes(),
|
||||
Timeout = 15_000,
|
||||
Results = emqx_metrics_proto_v2:get_metrics(Nodes, ?METRIC_NAME, ClusterName, Timeout),
|
||||
sequence_multicall_results(Nodes, Results).
|
||||
|
||||
maybe_create_metrics(ClusterName) ->
|
||||
case emqx_metrics_worker:has_metrics(?METRIC_NAME, ClusterName) of
|
||||
true ->
|
||||
ok = emqx_metrics_worker:reset_metrics(?METRIC_NAME, ClusterName);
|
||||
false ->
|
||||
ok = emqx_metrics_worker:create_metrics(
|
||||
?METRIC_NAME, ClusterName, ?METRICS, ?RATE_METRICS
|
||||
)
|
||||
end.
|
||||
|
||||
drop_metrics(ClusterName) ->
|
||||
ok = emqx_metrics_worker:clear_metrics(?METRIC_NAME, ClusterName).
|
||||
|
||||
routes_inc(ClusterName, Val) ->
|
||||
catch emqx_metrics_worker:inc(?METRIC_NAME, ClusterName, ?route_metric, Val).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec sequence_multicall_results([node()], emqx_rpc:erpc_multicall(term())) ->
|
||||
{ok, [{node(), term()}]} | {error, [term()]}.
|
||||
sequence_multicall_results(Nodes, Results) ->
|
||||
case lists:partition(fun is_ok/1, lists:zip(Nodes, Results)) of
|
||||
{OkResults, []} ->
|
||||
{ok, [{Node, Res} || {Node, {ok, Res}} <- OkResults]};
|
||||
{_OkResults, BadResults} ->
|
||||
{error, BadResults}
|
||||
end.
|
||||
|
||||
is_ok({_Node, {ok, _}}) -> true;
|
||||
is_ok(_) -> false.
|
Loading…
Reference in New Issue