diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 8623ef04d..4c2fdd3eb 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -28,6 +28,7 @@ {emqx_management,2}. {emqx_management,3}. {emqx_management,4}. +{emqx_metrics,1}. {emqx_mgmt_api_plugins,1}. {emqx_mgmt_api_plugins,2}. {emqx_mgmt_cluster,1}. @@ -38,7 +39,6 @@ {emqx_node_rebalance_evacuation,1}. {emqx_node_rebalance_status,1}. {emqx_persistent_session,1}. -{emqx_plugin_libs,1}. {emqx_plugins,1}. {emqx_prometheus,1}. {emqx_resource,1}. diff --git a/apps/emqx_plugin_libs/src/proto/emqx_plugin_libs_proto_v1.erl b/apps/emqx/src/proto/emqx_metrics_proto_v1.erl similarity index 71% rename from apps/emqx_plugin_libs/src/proto/emqx_plugin_libs_proto_v1.erl rename to apps/emqx/src/proto/emqx_metrics_proto_v1.erl index 8cd9952e0..c8c92430e 100644 --- a/apps/emqx_plugin_libs/src/proto/emqx_plugin_libs_proto_v1.erl +++ b/apps/emqx/src/proto/emqx_metrics_proto_v1.erl @@ -14,25 +14,26 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_plugin_libs_proto_v1). +-module(emqx_metrics_proto_v1). -behaviour(emqx_bpapi). -export([ introduced_in/0, - get_metrics/3 + get_metrics/4 ]). --include_lib("emqx/include/bpapi.hrl"). +-include("bpapi.hrl"). introduced_in() -> - "5.0.0". + "5.1.0". -spec get_metrics( - node(), + [node()], emqx_metrics_worker:handler_name(), - emqx_metrics_worker:metric_id() -) -> emqx_metrics_worker:metrics() | {badrpc, _}. -get_metrics(Node, HandlerName, MetricId) -> - rpc:call(Node, emqx_metrics_worker, get_metrics, [HandlerName, MetricId]). + emqx_metrics_worker:metric_id(), + timeout() +) -> emqx_rpc:erpc_multicall(emqx_metrics_worker:metrics()). +get_metrics(Nodes, HandlerName, MetricId, Timeout) -> + erpc:multicall(Nodes, emqx_metrics_worker, get_metrics, [HandlerName, MetricId], Timeout). diff --git a/apps/emqx/test/emqx_bpapi_static_checks.erl b/apps/emqx/test/emqx_bpapi_static_checks.erl index c944293d2..56baf05e8 100644 --- a/apps/emqx/test/emqx_bpapi_static_checks.erl +++ b/apps/emqx/test/emqx_bpapi_static_checks.erl @@ -51,8 +51,14 @@ "gen_rpc, recon, redbug, observer_cli, snabbkaffe, ekka, mria, amqp_client, rabbit_common" ). -define(IGNORED_MODULES, "emqx_rpc"). --define(FORCE_DELETED_MODULES, [emqx_statsd, emqx_statsd_proto_v1]). --define(FORCE_DELETED_APIS, [{emqx_statsd, 1}]). +-define(FORCE_DELETED_MODULES, [ + emqx_statsd, + emqx_statsd_proto_v1 +]). +-define(FORCE_DELETED_APIS, [ + {emqx_statsd, 1}, + {emqx_plugin_libs, 1} +]). %% List of known RPC backend modules: -define(RPC_MODULES, "gen_rpc, erpc, rpc, emqx_rpc"). %% List of known functions also known to do RPC: diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src b/apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src index 82a95c377..6953acdf1 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_plugin_libs, [ {description, "EMQX Plugin utility libs"}, - {vsn, "4.3.11"}, + {vsn, "4.3.12"}, {modules, []}, {applications, [kernel, stdlib]}, {env, []} diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 54a739b6d..b00ef387c 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -44,10 +44,13 @@ %% query callback -export([qs2ms/2, run_fuzzy_match/2, format_rule_info_resp/1]). +-define(RPC_GET_METRICS_TIMEOUT, 5000). + -define(ERR_BADARGS(REASON), begin R0 = err_msg(REASON), <<"Bad Arguments: ", R0/binary>> end). + -define(CHECK_PARAMS(PARAMS, TAG, EXPR), case emqx_rule_api_schema:check_params(PARAMS, TAG) of {ok, CheckedParams} -> @@ -56,6 +59,7 @@ end). {400, #{code => 'BAD_REQUEST', message => ?ERR_BADARGS(REASON)}} end ). + -define(METRICS( MATCH, PASS, @@ -87,6 +91,7 @@ end). 'matched.rate.last5m' => RATE_5 } ). + -define(metrics( MATCH, PASS, @@ -527,74 +532,77 @@ printable_function_name(Mod, Func) -> list_to_binary(lists:concat([Mod, ":", Func])). get_rule_metrics(Id) -> - Format = fun - ( - Node, - #{ - counters := - #{ - 'matched' := Matched, - 'passed' := Passed, - 'failed' := Failed, - 'failed.exception' := FailedEx, - 'failed.no_result' := FailedNoRes, - 'actions.total' := OTotal, - 'actions.failed' := OFailed, - 'actions.failed.out_of_service' := OFailedOOS, - 'actions.failed.unknown' := OFailedUnknown, - 'actions.success' := OFailedSucc - }, - rate := - #{ - 'matched' := - #{current := Current, max := Max, last5m := Last5M} - } - } - ) -> - #{ - metrics => ?METRICS( - Matched, - Passed, - Failed, - FailedEx, - FailedNoRes, - OTotal, - OFailed, - OFailedOOS, - OFailedUnknown, - OFailedSucc, - Current, - Max, - Last5M - ), - node => Node - }; - (Node, _Metrics) -> - %% Empty metrics: can happen when a node joins another and a bridge is not yet - %% replicated to it, so the counters map is empty. - #{ - metrics => ?METRICS( - _Matched = 0, - _Passed = 0, - _Failed = 0, - _FailedEx = 0, - _FailedNoRes = 0, - _OTotal = 0, - _OFailed = 0, - _OFailedOOS = 0, - _OFailedUnknown = 0, - _OFailedSucc = 0, - _Current = 0, - _Max = 0, - _Last5M = 0 - ), - node => Node - } - end, - [ - Format(Node, emqx_plugin_libs_proto_v1:get_metrics(Node, rule_metrics, Id)) - || Node <- mria:running_nodes() - ]. + Nodes = emqx:running_nodes(), + Results = emqx_metrics_proto_v1:get_metrics(Nodes, rule_metrics, Id, ?RPC_GET_METRICS_TIMEOUT), + NodeResults = lists:zip(Nodes, Results), + NodeMetrics = [format_metrics(Node, Metrics) || {Node, {ok, Metrics}} <- NodeResults], + NodeErrors = [Result || Result = {_Node, {NOk, _}} <- NodeResults, NOk =/= ok], + NodeErrors == [] orelse + ?SLOG(warning, #{ + msg => "rpc_get_rule_metrics_errors", + errors => NodeErrors + }), + NodeMetrics. + +format_metrics(Node, #{ + counters := + #{ + 'matched' := Matched, + 'passed' := Passed, + 'failed' := Failed, + 'failed.exception' := FailedEx, + 'failed.no_result' := FailedNoRes, + 'actions.total' := OTotal, + 'actions.failed' := OFailed, + 'actions.failed.out_of_service' := OFailedOOS, + 'actions.failed.unknown' := OFailedUnknown, + 'actions.success' := OFailedSucc + }, + rate := + #{ + 'matched' := + #{current := Current, max := Max, last5m := Last5M} + } +}) -> + #{ + metrics => ?METRICS( + Matched, + Passed, + Failed, + FailedEx, + FailedNoRes, + OTotal, + OFailed, + OFailedOOS, + OFailedUnknown, + OFailedSucc, + Current, + Max, + Last5M + ), + node => Node + }; +format_metrics(Node, _Metrics) -> + %% Empty metrics: can happen when a node joins another and a bridge is not yet + %% replicated to it, so the counters map is empty. + #{ + metrics => ?METRICS( + _Matched = 0, + _Passed = 0, + _Failed = 0, + _FailedEx = 0, + _FailedNoRes = 0, + _OTotal = 0, + _OFailed = 0, + _OFailedOOS = 0, + _OFailedUnknown = 0, + _OFailedSucc = 0, + _Current = 0, + _Max = 0, + _Last5M = 0 + ), + node => Node + }. aggregate_metrics(AllMetrics) -> InitMetrics = ?METRICS(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0),