From 6acdc6e432fbc85169e17a3ee5ef389de9f05f08 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 27 Nov 2023 11:32:23 -0300 Subject: [PATCH] feat: add `/actions/:id/metrics/`, `/actions/:id/metrics/reset` APIs Fixes https://emqx.atlassian.net/browse/EMQX-11381 --- apps/emqx_bridge/src/emqx_bridge_v2_api.erl | 186 ++++++++++++++++- .../src/proto/emqx_bridge_proto_v5.erl | 14 +- .../test/emqx_bridge_v2_api_SUITE.erl | 192 +++++++++++++----- 3 files changed, 341 insertions(+), 51 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl index cb1f7cc62..13e84f84e 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl @@ -21,6 +21,7 @@ -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx_utils/include/emqx_utils_api.hrl"). +-include_lib("emqx_bridge/include/emqx_bridge.hrl"). -import(hoconsc, [mk/2, array/1, enum/1]). -import(emqx_utils, [redact/1]). @@ -37,6 +38,8 @@ -export([ '/actions'/2, '/actions/:id'/2, + '/actions/:id/metrics'/2, + '/actions/:id/metrics/reset'/2, '/actions/:id/enable/:enable'/2, '/actions/:id/:operation'/2, '/nodes/:node/actions/:id/:operation'/2, @@ -44,8 +47,8 @@ '/action_types'/2 ]). -%% BpAPI --export([lookup_from_local_node/2]). +%% BpAPI / RPC Targets +-export([lookup_from_local_node/2, get_metrics_from_local_node/2]). -define(BRIDGE_NOT_FOUND(BRIDGE_TYPE, BRIDGE_NAME), ?NOT_FOUND( @@ -80,6 +83,10 @@ paths() -> "/actions/:id/enable/:enable", "/actions/:id/:operation", "/nodes/:node/actions/:id/:operation", + %% Caveat: metrics paths must come *after* `/:operation', otherwise minirest will + %% try to match the latter first, trying to interpret `metrics' as an operation... + "/actions/:id/metrics", + "/actions/:id/metrics/reset", "/actions_probe", "/action_types" ]. @@ -247,6 +254,34 @@ schema("/actions/:id") -> } } }; +schema("/actions/:id/metrics") -> + #{ + 'operationId' => '/actions/:id/metrics', + get => #{ + tags => [<<"actions">>], + summary => <<"Get action metrics">>, + description => ?DESC("desc_bridge_metrics"), + parameters => [param_path_id()], + responses => #{ + 200 => emqx_bridge_schema:metrics_fields(), + 404 => error_schema('NOT_FOUND', "Action not found") + } + } + }; +schema("/actions/:id/metrics/reset") -> + #{ + 'operationId' => '/actions/:id/metrics/reset', + put => #{ + tags => [<<"actions">>], + summary => <<"Reset action metrics">>, + description => ?DESC("desc_api6"), + parameters => [param_path_id()], + responses => #{ + 204 => <<"Reset success">>, + 404 => error_schema('NOT_FOUND', "Action not found") + } + } + }; schema("/actions/:id/enable/:enable") -> #{ 'operationId' => '/actions/:id/enable/:enable', @@ -429,6 +464,19 @@ schema("/action_types") -> end ). +'/actions/:id/metrics'(get, #{bindings := #{id := Id}}) -> + ?TRY_PARSE_ID(Id, get_metrics_from_all_nodes(BridgeType, BridgeName)). + +'/actions/:id/metrics/reset'(put, #{bindings := #{id := Id}}) -> + ?TRY_PARSE_ID( + Id, + begin + ActionType = emqx_bridge_v2:bridge_v2_type_to_connector_type(BridgeType), + ok = emqx_bridge_v2:reset_metrics(ActionType, BridgeName), + ?NO_CONTENT + end + ). + '/actions/:id/enable/:enable'(put, #{bindings := #{id := Id, enable := Enable}}) -> ?TRY_PARSE_ID( Id, @@ -570,6 +618,18 @@ lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) -> ?INTERNAL_ERROR(Reason) end. +get_metrics_from_all_nodes(ActionType, ActionName) -> + Nodes = emqx:running_nodes(), + Result = maybe_unwrap( + emqx_bridge_proto_v5:v2_get_metrics_from_all_nodes(Nodes, ActionType, ActionName) + ), + case Result of + Metrics when is_list(Metrics) -> + {200, format_bridge_metrics(lists:zip(Nodes, Metrics))}; + {error, Reason} -> + ?INTERNAL_ERROR(Reason) + end. + operation_func(all, start) -> v2_start_bridge_to_all_nodes; operation_func(_Node, start) -> v2_start_bridge_to_node. @@ -720,12 +780,17 @@ aggregate_status(AllStatus) -> false -> inconsistent end. +%% RPC Target lookup_from_local_node(BridgeType, BridgeName) -> case emqx_bridge_v2:lookup(BridgeType, BridgeName) of {ok, Res} -> {ok, format_resource(Res, node())}; Error -> Error end. +%% RPC Target +get_metrics_from_local_node(ActionType, ActionName) -> + format_metrics(emqx_bridge_v2:get_metrics(ActionType, ActionName)). + %% resource format_resource( #{ @@ -751,6 +816,123 @@ format_resource( ) ). +format_metrics(#{ + counters := #{ + 'dropped' := Dropped, + 'dropped.other' := DroppedOther, + 'dropped.expired' := DroppedExpired, + 'dropped.queue_full' := DroppedQueueFull, + 'dropped.resource_not_found' := DroppedResourceNotFound, + 'dropped.resource_stopped' := DroppedResourceStopped, + 'matched' := Matched, + 'retried' := Retried, + 'late_reply' := LateReply, + 'failed' := SentFailed, + 'success' := SentSucc, + 'received' := Rcvd + }, + gauges := Gauges, + rate := #{ + matched := #{current := Rate, last5m := Rate5m, max := RateMax} + } +}) -> + Queued = maps:get('queuing', Gauges, 0), + SentInflight = maps:get('inflight', Gauges, 0), + ?METRICS( + Dropped, + DroppedOther, + DroppedExpired, + DroppedQueueFull, + DroppedResourceNotFound, + DroppedResourceStopped, + Matched, + Queued, + Retried, + LateReply, + SentFailed, + SentInflight, + SentSucc, + Rate, + Rate5m, + RateMax, + Rcvd + ); +format_metrics(_Metrics) -> + %% Empty metrics: can happen when a node joins another and a + %% bridge is not yet replicated to it, so the counters map is + %% empty. + empty_metrics(). + +empty_metrics() -> + ?METRICS( + _Dropped = 0, + _DroppedOther = 0, + _DroppedExpired = 0, + _DroppedQueueFull = 0, + _DroppedResourceNotFound = 0, + _DroppedResourceStopped = 0, + _Matched = 0, + _Queued = 0, + _Retried = 0, + _LateReply = 0, + _SentFailed = 0, + _SentInflight = 0, + _SentSucc = 0, + _Rate = 0, + _Rate5m = 0, + _RateMax = 0, + _Rcvd = 0 + ). + +format_bridge_metrics(Bridges) -> + NodeMetrics = lists:filtermap( + fun + ({Node, Metrics}) when is_map(Metrics) -> + {true, #{node => Node, metrics => Metrics}}; + ({Node, _}) -> + {true, #{node => Node, metrics => empty_metrics()}} + end, + Bridges + ), + #{ + metrics => aggregate_metrics(NodeMetrics), + node_metrics => NodeMetrics + }. + +aggregate_metrics(AllMetrics) -> + InitMetrics = ?EMPTY_METRICS, + lists:foldl(fun aggregate_metrics/2, InitMetrics, AllMetrics). + +aggregate_metrics( + #{ + metrics := ?metrics( + M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17 + ) + }, + ?metrics( + N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17 + ) +) -> + ?METRICS( + M1 + N1, + M2 + N2, + M3 + N3, + M4 + N4, + M5 + N5, + M6 + N6, + M7 + N7, + M8 + N8, + M9 + N9, + M10 + N10, + M11 + N11, + M12 + N12, + M13 + N13, + M14 + N14, + M15 + N15, + M16 + N16, + M17 + N17 + ). + format_bridge_status_and_error(Data) -> maps:fold(fun format_resource_data/3, #{}, maps:with([status, error], Data)). diff --git a/apps/emqx_bridge/src/proto/emqx_bridge_proto_v5.erl b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v5.erl index 1417615a7..75b99f0ec 100644 --- a/apps/emqx_bridge/src/proto/emqx_bridge_proto_v5.erl +++ b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v5.erl @@ -34,7 +34,8 @@ v2_start_bridge_to_node/3, v2_start_bridge_to_all_nodes/3, v2_list_bridges_on_nodes/1, - v2_lookup_from_all_nodes/3 + v2_lookup_from_all_nodes/3, + v2_get_metrics_from_all_nodes/3 ]). -include_lib("emqx/include/bpapi.hrl"). @@ -156,6 +157,17 @@ v2_lookup_from_all_nodes(Nodes, BridgeType, BridgeName) -> ?TIMEOUT ). +-spec v2_get_metrics_from_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +v2_get_metrics_from_all_nodes(Nodes, ActionType, ActionName) -> + erpc:multicall( + Nodes, + emqx_bridge_v2_api, + get_metrics_from_local_node, + [ActionType, ActionName], + ?TIMEOUT + ). + -spec v2_start_bridge_to_all_nodes([node()], key(), key()) -> emqx_rpc:erpc_multicall(). v2_start_bridge_to_all_nodes(Nodes, BridgeType, BridgeName) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl index cf58eefde..8758c325d 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl @@ -56,6 +56,7 @@ -define(CONNECTOR(Name), ?KAFKA_CONNECTOR(Name, ?KAFKA_BOOTSTRAP_HOST)). -define(CONNECTOR, ?CONNECTOR(?CONNECTOR_NAME)). +-define(MQTT_LOCAL_TOPIC, <<"mqtt/local/topic">>). -define(BRIDGE_NAME, (atom_to_binary(?FUNCTION_NAME))). -define(BRIDGE_TYPE_STR, "kafka_producer"). -define(BRIDGE_TYPE, <>). @@ -93,7 +94,7 @@ <<"required_acks">> => <<"all_isr">>, <<"topic">> => <<"kafka-topic">> }, - <<"local_topic">> => <<"mqtt/local/topic">>, + <<"local_topic">> => ?MQTT_LOCAL_TOPIC, <<"resource_opts">> => #{ <<"health_check_interval">> => <<"32s">> } @@ -105,48 +106,6 @@ ). -define(KAFKA_BRIDGE_UPDATE(Name), ?KAFKA_BRIDGE_UPDATE(Name, ?CONNECTOR_NAME)). -%% -define(BRIDGE_TYPE_MQTT, <<"mqtt">>). -%% -define(MQTT_BRIDGE(SERVER, NAME), ?BRIDGE(NAME, ?BRIDGE_TYPE_MQTT)#{ -%% <<"server">> => SERVER, -%% <<"username">> => <<"user1">>, -%% <<"password">> => <<"">>, -%% <<"proto_ver">> => <<"v5">>, -%% <<"egress">> => #{ -%% <<"remote">> => #{ -%% <<"topic">> => <<"emqx/${topic}">>, -%% <<"qos">> => <<"${qos}">>, -%% <<"retain">> => false -%% } -%% } -%% }). -%% -define(MQTT_BRIDGE(SERVER), ?MQTT_BRIDGE(SERVER, <<"mqtt_egress_test_bridge">>)). - -%% -define(BRIDGE_TYPE_HTTP, <<"kafka">>). -%% -define(HTTP_BRIDGE(URL, NAME), ?BRIDGE(NAME, ?BRIDGE_TYPE_HTTP)#{ -%% <<"url">> => URL, -%% <<"local_topic">> => <<"emqx_webhook/#">>, -%% <<"method">> => <<"post">>, -%% <<"body">> => <<"${payload}">>, -%% <<"headers">> => #{ -%% % NOTE -%% % The Pascal-Case is important here. -%% % The reason is kinda ridiculous: `emqx_bridge_resource:create_dry_run/2` converts -%% % bridge config keys into atoms, and the atom 'Content-Type' exists in the ERTS -%% % when this happens (while the 'content-type' does not). -%% <<"Content-Type">> => <<"application/json">> -%% } -%% }). -%% -define(HTTP_BRIDGE(URL), ?HTTP_BRIDGE(URL, ?BRIDGE_NAME)). - -%% -define(URL(PORT, PATH), -%% list_to_binary( -%% io_lib:format( -%% "http://localhost:~s/~s", -%% [integer_to_list(PORT), PATH] -%% ) -%% ) -%% ). - -define(APPSPECS, [ emqx_conf, emqx, @@ -166,7 +125,7 @@ all() -> [ {group, single}, - %{group, cluster_later_join}, + {group, cluster_later_join}, {group, cluster} ]. -else. @@ -182,7 +141,7 @@ groups() -> t_fix_broken_bridge_config ], ClusterLaterJoinOnlyTCs = [ - % t_cluster_later_join_metrics + t_cluster_later_join_metrics ], [ {single, [], AllTCs -- ClusterLaterJoinOnlyTCs}, @@ -202,9 +161,9 @@ end_per_suite(_Config) -> init_per_group(cluster = Name, Config) -> Nodes = [NodePrimary | _] = mk_cluster(Name, Config), init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]); -%% init_per_group(cluster_later_join = Name, Config) -> -%% Nodes = [NodePrimary | _] = mk_cluster(Name, Config, #{join_to => undefined}), -%% init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]); +init_per_group(cluster_later_join = Name, Config) -> + Nodes = [NodePrimary | _] = mk_cluster(Name, Config, #{join_to => undefined}), + init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]); init_per_group(Name, Config) -> WorkDir = filename:join(?config(priv_dir, Config), Name), Apps = emqx_cth_suite:start(?APPSPECS ++ [?APPSPEC_DASHBOARD], #{work_dir => WorkDir}), @@ -1041,6 +1000,143 @@ t_bad_name(Config) -> ), ok. +t_metrics(Config) -> + {ok, 200, []} = request_json(get, uri([?ROOT]), Config), + + ActionName = ?BRIDGE_NAME, + ?assertMatch( + {ok, 201, _}, + request_json( + post, + uri([?ROOT]), + ?KAFKA_BRIDGE(?BRIDGE_NAME), + Config + ) + ), + + ActionID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ActionName), + + ?assertMatch( + {ok, 200, #{ + <<"metrics">> := #{<<"matched">> := 0}, + <<"node_metrics">> := [#{<<"metrics">> := #{<<"matched">> := 0}} | _] + }}, + request_json(get, uri([?ROOT, ActionID, "metrics"]), Config) + ), + + {ok, 200, Bridge} = request_json(get, uri([?ROOT, ActionID]), Config), + ?assertNot(maps:is_key(<<"metrics">>, Bridge)), + ?assertNot(maps:is_key(<<"node_metrics">>, Bridge)), + + Body = <<"my msg">>, + _ = publish_message(?MQTT_LOCAL_TOPIC, Body, Config), + + %% check for non-empty bridge metrics + ?retry( + _Sleep0 = 200, + _Retries0 = 20, + ?assertMatch( + {ok, 200, #{ + <<"metrics">> := #{<<"matched">> := 1}, + <<"node_metrics">> := [#{<<"metrics">> := #{<<"matched">> := 1}} | _] + }}, + request_json(get, uri([?ROOT, ActionID, "metrics"]), Config) + ) + ), + + %% check for absence of metrics when listing all bridges + {ok, 200, Bridges} = request_json(get, uri([?ROOT]), Config), + ?assertNotMatch( + [ + #{ + <<"metrics">> := #{}, + <<"node_metrics">> := [_ | _] + } + ], + Bridges + ), + ok. + +t_reset_metrics(Config) -> + %% assert there's no bridges at first + {ok, 200, []} = request_json(get, uri([?ROOT]), Config), + + ActionName = ?BRIDGE_NAME, + ?assertMatch( + {ok, 201, _}, + request_json( + post, + uri([?ROOT]), + ?KAFKA_BRIDGE(?BRIDGE_NAME), + Config + ) + ), + ActionID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ActionName), + + Body = <<"my msg">>, + _ = publish_message(?MQTT_LOCAL_TOPIC, Body, Config), + ?retry( + _Sleep0 = 200, + _Retries0 = 20, + ?assertMatch( + {ok, 200, #{ + <<"metrics">> := #{<<"matched">> := 1}, + <<"node_metrics">> := [#{<<"metrics">> := #{}} | _] + }}, + request_json(get, uri([?ROOT, ActionID, "metrics"]), Config) + ) + ), + + {ok, 204, <<>>} = request(put, uri([?ROOT, ActionID, "metrics", "reset"]), Config), + + ?retry( + _Sleep0 = 200, + _Retries0 = 20, + ?assertMatch( + {ok, 200, #{ + <<"metrics">> := #{<<"matched">> := 0}, + <<"node_metrics">> := [#{<<"metrics">> := #{}} | _] + }}, + request_json(get, uri([?ROOT, ActionID, "metrics"]), Config) + ) + ), + + ok. + +t_cluster_later_join_metrics(Config) -> + [PrimaryNode, OtherNode | _] = ?config(cluster_nodes, Config), + Name = ?BRIDGE_NAME, + ActionParams = ?KAFKA_BRIDGE(Name), + ActionID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), + ?check_trace( + begin + %% Create a bridge on only one of the nodes. + ?assertMatch({ok, 201, _}, request_json(post, uri([?ROOT]), ActionParams, Config)), + %% Pre-condition. + ?assertMatch( + {ok, 200, #{ + <<"metrics">> := #{<<"success">> := _}, + <<"node_metrics">> := [#{<<"metrics">> := #{}} | _] + }}, + request_json(get, uri([?ROOT, ActionID, "metrics"]), Config) + ), + %% Now join the other node join with the api node. + ok = erpc:call(OtherNode, ekka, join, [PrimaryNode]), + %% Check metrics; shouldn't crash even if the bridge is not + %% ready on the node that just joined the cluster. + ?assertMatch( + {ok, 200, #{ + <<"metrics">> := #{<<"success">> := _}, + <<"node_metrics">> := [#{<<"metrics">> := #{}}, #{<<"metrics">> := #{}} | _] + }}, + request_json(get, uri([?ROOT, ActionID, "metrics"]), Config) + ), + ok + end, + [] + ), + ok. + %%% helpers listen_on_random_port() -> SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}],