diff --git a/apps/emqx_bridge/i18n/emqx_bridge_api.conf b/apps/emqx_bridge/i18n/emqx_bridge_api.conf index e8bb2403a..796cbb91c 100644 --- a/apps/emqx_bridge/i18n/emqx_bridge_api.conf +++ b/apps/emqx_bridge/i18n/emqx_bridge_api.conf @@ -134,4 +134,14 @@ NOTE:不允许在单节点上启用/禁用 Bridge""" } } + desc_bridge_metrics { + desc { + en: """Get bridge metrics by Id""" + zh: """""" + } + label: { + en: "Get Bridge Metrics" + zh: "" + } + } } diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index a71142bc5..b05e31b11 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -38,7 +38,8 @@ '/bridges/:id'/2, '/bridges/:id/:operation'/2, '/nodes/:node/bridges/:id/:operation'/2, - '/bridges/:id/reset_metrics'/2 + '/bridges/:id/metrics'/2, + '/bridges/:id/metrics/reset'/2 ]). -export([lookup_from_local_node/2]). @@ -68,7 +69,8 @@ paths() -> "/bridges/:id", "/bridges/:id/:operation", "/nodes/:node/bridges/:id/:operation", - "/bridges/:id/reset_metrics" + "/bridges/:id/metrics", + "/bridges/:id/metrics/reset" ]. error_schema(Code, Message) when is_atom(Code) -> @@ -132,19 +134,22 @@ param_path_id() -> } )}. -bridge_info_array_example(Method) -> - [Config || #{value := Config} <- maps:values(bridge_info_examples(Method))]. +bridge_info_array_example(Method, WithMetrics) -> + [Config || #{value := Config} <- maps:values(bridge_info_examples(Method, WithMetrics))]. bridge_info_examples(Method) -> + bridge_info_examples(Method, false). + +bridge_info_examples(Method, WithMetrics) -> maps:merge( #{ <<"webhook_example">> => #{ summary => <<"WebHook">>, - value => info_example(webhook, Method) + value => info_example(webhook, Method, WithMetrics) }, <<"mqtt_example">> => #{ summary => <<"MQTT Bridge">>, - value => info_example(mqtt, Method) + value => info_example(mqtt, Method, WithMetrics) } }, ee_bridge_examples(Method) @@ -157,24 +162,24 @@ ee_bridge_examples(Method) -> _:_ -> #{} end. -info_example(Type, Method) -> +info_example(Type, Method, WithMetrics) -> maps:merge( info_example_basic(Type), - method_example(Type, Method) + method_example(Type, Method, WithMetrics) ). -method_example(Type, Method) when Method == get; Method == post -> +method_example(Type, Method, WithMetrics) when Method == get; Method == post -> SType = atom_to_list(Type), SName = SType ++ "_example", TypeNameExam = #{ type => bin(SType), name => bin(SName) }, - maybe_with_metrics_example(TypeNameExam, Method); -method_example(_Type, put) -> + maybe_with_metrics_example(TypeNameExam, Method, WithMetrics); +method_example(_Type, put, _WithMetrics) -> #{}. -maybe_with_metrics_example(TypeNameExam, get) -> +maybe_with_metrics_example(TypeNameExam, get, true) -> TypeNameExam#{ metrics => ?EMPTY_METRICS, node_metrics => [ @@ -184,7 +189,7 @@ maybe_with_metrics_example(TypeNameExam, get) -> } ] }; -maybe_with_metrics_example(TypeNameExam, _) -> +maybe_with_metrics_example(TypeNameExam, _, _) -> TypeNameExam. info_example_basic(webhook) -> @@ -274,7 +279,7 @@ schema("/bridges") -> responses => #{ 200 => emqx_dashboard_swagger:schema_with_example( array(emqx_bridge_schema:get_response()), - bridge_info_array_example(get) + bridge_info_array_example(get, true) ) } }, @@ -334,9 +339,23 @@ schema("/bridges/:id") -> } } }; -schema("/bridges/:id/reset_metrics") -> +schema("/bridges/:id/metrics") -> #{ - 'operationId' => '/bridges/:id/reset_metrics', + 'operationId' => '/bridges/:id/metrics', + get => #{ + tags => [<<"bridges">>], + summary => <<"Get Bridge Metrics">>, + description => ?DESC("desc_bridge_metrics"), + parameters => [param_path_id()], + responses => #{ + 200 => emqx_bridge_schema:metrics_fields(), + 404 => error_schema('NOT_FOUND', "Bridge not found") + } + } + }; +schema("/bridges/:id/metrics/reset") -> + #{ + 'operationId' => '/bridges/:id/metrics/reset', put => #{ tags => [<<"bridges">>], summary => <<"Reset Bridge Metrics">>, @@ -455,7 +474,10 @@ schema("/nodes/:node/bridges/:id/:operation") -> end ). -'/bridges/:id/reset_metrics'(put, #{bindings := #{id := Id}}) -> +'/bridges/:id/metrics'(get, #{bindings := #{id := Id}}) -> + ?TRY_PARSE_ID(Id, lookup_from_all_nodes_metrics(BridgeType, BridgeName, 200)). + +'/bridges/:id/metrics/reset'(put, #{bindings := #{id := Id}}) -> ?TRY_PARSE_ID( Id, case @@ -469,10 +491,18 @@ schema("/nodes/:node/bridges/:id/:operation") -> ). lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) -> + FormatFun = fun format_bridge_info_without_metrics/1, + do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun). + +lookup_from_all_nodes_metrics(BridgeType, BridgeName, SuccCode) -> + FormatFun = fun format_bridge_metrics/1, + do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun). + +do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun) -> Nodes = mria_mnesia:running_nodes(), case is_ok(emqx_bridge_proto_v1:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of {ok, [{ok, _} | _] = Results} -> - {SuccCode, format_bridge_info([R || {ok, R} <- Results])}; + {SuccCode, FormatFun([R || {ok, R} <- Results])}; {ok, [{error, not_found} | _]} -> {404, error_msg('NOT_FOUND', <<"not_found">>)}; {error, ErrL} -> @@ -572,7 +602,7 @@ zip_bridges([BridgesFirstNode | _] = BridgesAllNodes) -> lists:foldl( fun(#{type := Type, name := Name}, Acc) -> Bridges = pick_bridges_by_id(Type, Name, BridgesAllNodes), - [format_bridge_info(Bridges) | Acc] + [format_bridge_info_with_metrics(Bridges) | Acc] end, [], BridgesFirstNode @@ -606,7 +636,7 @@ pick_bridges_by_id(Type, Name, BridgesAllNodes) -> BridgesAllNodes ). -format_bridge_info([FirstBridge | _] = Bridges) -> +format_bridge_info_with_metrics([FirstBridge | _] = Bridges) -> Res = maps:remove(node, FirstBridge), NodeStatus = collect_status(Bridges), NodeMetrics = collect_metrics(Bridges), @@ -617,6 +647,14 @@ format_bridge_info([FirstBridge | _] = Bridges) -> node_metrics => NodeMetrics }). +format_bridge_info_without_metrics(Bridges) -> + Res = format_bridge_info_with_metrics(Bridges), + maps:without([metrics, node_metrics], Res). + +format_bridge_metrics(Bridges) -> + Res = format_bridge_info_with_metrics(Bridges), + maps:with([metrics, node_metrics], Res). + collect_status(Bridges) -> [maps:with([node, status], B) || B <- Bridges]. diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl index 4665a3bc5..5cd1693c7 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl @@ -51,7 +51,7 @@ fields("post") -> fields("put") -> fields("config"); fields("get") -> - emqx_bridge_schema:metrics_status_fields() ++ fields("config"). + emqx_bridge_schema:status_fields() ++ fields("config"). desc("config") -> ?DESC("config"); diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl index 845c1ef90..09a99488e 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl @@ -30,7 +30,8 @@ -export([ common_bridge_fields/0, - metrics_status_fields/0 + status_fields/0, + metrics_fields/0 ]). %%====================================================================================== @@ -83,19 +84,23 @@ common_bridge_fields() -> )} ]. -metrics_status_fields() -> +status_fields() -> + [ + {"status", mk(status(), #{desc => ?DESC("desc_status")})}, + {"node_status", + mk( + hoconsc:array(ref(?MODULE, "node_status")), + #{desc => ?DESC("desc_node_status")} + )} + ]. + +metrics_fields() -> [ {"metrics", mk(ref(?MODULE, "metrics"), #{desc => ?DESC("desc_metrics")})}, {"node_metrics", mk( hoconsc:array(ref(?MODULE, "node_metrics")), #{desc => ?DESC("desc_node_metrics")} - )}, - {"status", mk(status(), #{desc => ?DESC("desc_status")})}, - {"node_status", - mk( - hoconsc:array(ref(?MODULE, "node_status")), - #{desc => ?DESC("desc_node_status")} )} ]. diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl index 0495911e7..b495436a4 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl @@ -38,7 +38,7 @@ fields("post") -> fields("put") -> fields("config"); fields("get") -> - emqx_bridge_schema:metrics_status_fields() ++ fields("post"); + emqx_bridge_schema:status_fields() ++ fields("post"); fields("creation_opts") -> lists:filter( fun({K, _V}) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index a6b5ece89..eb86b923f 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -187,8 +187,6 @@ t_http_crud_apis(Config) -> <<"enable">> := true, <<"status">> := _, <<"node_status">> := [_ | _], - <<"metrics">> := _, - <<"node_metrics">> := [_ | _], <<"url">> := URL1 } = jsx:decode(Bridge), @@ -225,8 +223,6 @@ t_http_crud_apis(Config) -> <<"enable">> := true, <<"status">> := _, <<"node_status">> := [_ | _], - <<"metrics">> := _, - <<"node_metrics">> := [_ | _], <<"url">> := URL2 }, jsx:decode(Bridge2) @@ -259,8 +255,6 @@ t_http_crud_apis(Config) -> <<"enable">> := true, <<"status">> := _, <<"node_status">> := [_ | _], - <<"metrics">> := _, - <<"node_metrics">> := [_ | _], <<"url">> := URL2 }, jsx:decode(Bridge3Str) @@ -456,8 +450,6 @@ do_start_stop_bridges(Type, Config) -> <<"enable">> := true, <<"status">> := <<"connected">>, <<"node_status">> := [_ | _], - <<"metrics">> := _, - <<"node_metrics">> := [_ | _], <<"url">> := URL1 } = jsx:decode(Bridge), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), @@ -502,8 +494,6 @@ t_enable_disable_bridges(Config) -> <<"enable">> := true, <<"status">> := <<"connected">>, <<"node_status">> := [_ | _], - <<"metrics">> := _, - <<"node_metrics">> := [_ | _], <<"url">> := URL1 } = jsx:decode(Bridge), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), @@ -555,12 +545,10 @@ t_reset_bridges(Config) -> <<"enable">> := true, <<"status">> := <<"connected">>, <<"node_status">> := [_ | _], - <<"metrics">> := _, - <<"node_metrics">> := [_ | _], <<"url">> := URL1 } = jsx:decode(Bridge), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), - {ok, 200, <<"Reset success">>} = request(put, uri(["bridges", BridgeID, "reset_metrics"]), []), + {ok, 200, <<"Reset success">>} = request(put, uri(["bridges", BridgeID, "metrics/reset"]), []), %% delete the bridge {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), @@ -599,6 +587,88 @@ t_with_redact_update(_Config) -> ?assertEqual(Password, Value), ok. +t_metrics(Config) -> + Port = ?config(port, Config), + %% assert we there's no bridges at first + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + + %% then we add a webhook bridge, using POST + %% POST /bridges/ will create a bridge + URL1 = ?URL(Port, "path1"), + Name = ?BRIDGE_NAME, + {ok, 201, Bridge} = request( + post, + uri(["bridges"]), + ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name) + ), + + %ct:pal("---bridge: ~p", [Bridge]), + #{ + <<"type">> := ?BRIDGE_TYPE, + <<"name">> := Name, + <<"enable">> := true, + <<"status">> := _, + <<"node_status">> := [_ | _], + <<"url">> := URL1 + } = jsx:decode(Bridge), + + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), + + %% check for empty bridge metrics + {ok, 200, Bridge1Str} = request(get, uri(["bridges", BridgeID, "metrics"]), []), + ct:pal("HERE ~p", [jsx:decode(Bridge1Str)]), + ?assertMatch( + #{ + <<"metrics">> := #{<<"success">> := 0}, + <<"node_metrics">> := [_ | _] + }, + jsx:decode(Bridge1Str) + ), + + %% send an message to emqx and the message should be forwarded to the HTTP server + Body = <<"my msg">>, + emqx:publish(emqx_message:make(<<"emqx_webhook/1">>, Body)), + ?assert( + receive + {http_server, received, #{ + method := <<"POST">>, + path := <<"/path1">>, + body := Body + }} -> + true; + Msg -> + ct:pal("error: http got unexpected request: ~p", [Msg]), + false + after 100 -> + false + end + ), + + %% check for non-empty bridge metrics + {ok, 200, Bridge2Str} = request(get, uri(["bridges", BridgeID, "metrics"]), []), + ct:pal("HERE ~p", [jsx:decode(Bridge2Str)]), + ?assertMatch( + #{ + <<"metrics">> := #{<<"success">> := 1}, + <<"node_metrics">> := [_ | _] + }, + jsx:decode(Bridge2Str) + ), + + %% check for non-empty metrics when listing all bridges + {ok, 200, BridgesStr} = request(get, uri(["bridges"]), []), + ct:pal("HERE ~p", [jsx:decode(BridgesStr)]), + ?assertMatch( + [ + #{ + <<"metrics">> := #{<<"success">> := 1}, + <<"node_metrics">> := [_ | _] + } + ], + jsx:decode(BridgesStr) + ), + ok. + operation_path(node, Oper, BridgeID) -> uri(["nodes", node(), "bridges", BridgeID, Oper]); operation_path(cluster, Oper, BridgeID) -> diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl index 760aba9e1..83fe31b49 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl @@ -124,7 +124,7 @@ fields(bridge_config) -> )} ]; fields("get") -> - emqx_bridge_schema:metrics_status_fields() ++ fields("post"); + emqx_bridge_schema:status_fields() ++ fields("post"); fields("post") -> [type_field(), name_field() | fields("config")]; fields("put") -> diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstreamdb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstreamdb.erl index 135087929..6e0c711b2 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstreamdb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstreamdb.erl @@ -67,7 +67,7 @@ fields("post") -> fields("put") -> fields("config"); fields("get") -> - emqx_bridge_schema:metrics_status_fields() ++ fields("post"). + emqx_bridge_schema:status_fields() ++ fields("post"). field(connector) -> mk( diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl index 62c8b6ab7..fece72e82 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl @@ -139,7 +139,7 @@ method_fileds(get, ConnectorType) -> influxdb_bridge_common_fields() ++ connector_fields(ConnectorType) ++ type_name_fields(ConnectorType) ++ - emqx_bridge_schema:metrics_status_fields(); + emqx_bridge_schema:status_fields(); method_fileds(put, ConnectorType) -> influxdb_bridge_common_fields() ++ connector_fields(ConnectorType). diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl index 3f2f6a85f..9fae4f30a 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl @@ -67,7 +67,7 @@ fields("post") -> fields("put") -> fields("config"); fields("get") -> - emqx_bridge_schema:metrics_status_fields() ++ fields("post"); + emqx_bridge_schema:status_fields() ++ fields("post"); fields("config") -> [ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl index bb4082681..eaf2b7da1 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl @@ -59,15 +59,15 @@ fields("put_sharded") -> fields("put_single") -> fields(mongodb_single); fields("get_rs") -> - emqx_bridge_schema:metrics_status_fields() ++ + emqx_bridge_schema:status_fields() ++ fields(mongodb_rs) ++ type_and_name_fields(mongodb_rs); fields("get_sharded") -> - emqx_bridge_schema:metrics_status_fields() ++ + emqx_bridge_schema:status_fields() ++ fields(mongodb_sharded) ++ type_and_name_fields(mongodb_sharded); fields("get_single") -> - emqx_bridge_schema:metrics_status_fields() ++ + emqx_bridge_schema:status_fields() ++ fields(mongodb_single) ++ type_and_name_fields(mongodb_single). diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl index 114459149..71f8a8399 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl @@ -104,7 +104,7 @@ fields("post") -> fields("put") -> fields("config"); fields("get") -> - emqx_bridge_schema:metrics_status_fields() ++ fields("post"). + emqx_bridge_schema:status_fields() ++ fields("post"). desc("config") -> ?DESC("desc_config"); diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl index be9fc9dc8..7e21d4dd7 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl @@ -106,7 +106,7 @@ fields("post") -> fields("put") -> fields("config"); fields("get") -> - emqx_bridge_schema:metrics_status_fields() ++ fields("post"). + emqx_bridge_schema:status_fields() ++ fields("post"). fields("post", Type) -> [type_field(Type), name_field() | fields("config")]. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl index 5c273e050..3a3963786 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl @@ -126,7 +126,7 @@ method_fileds(get, ConnectorType) -> redis_bridge_common_fields() ++ connector_fields(ConnectorType) ++ type_name_fields(ConnectorType) ++ - emqx_bridge_schema:metrics_status_fields(); + emqx_bridge_schema:status_fields(); method_fileds(put, ConnectorType) -> redis_bridge_common_fields() ++ connector_fields(ConnectorType).