From c2fd1a4482a30307fa986384d3d611bfcdeb03d8 Mon Sep 17 00:00:00 2001 From: Erik Timan Date: Thu, 12 Jan 2023 10:10:52 +0100 Subject: [PATCH 1/9] feat(emqx_bridge): shorten operation endpoint URLs This shortens and simplifies URLs for performing bridge operations so that the API looks more congruent. --- apps/emqx_bridge/src/emqx_bridge_api.erl | 20 +++++++++---------- .../test/emqx_bridge_api_SUITE.erl | 4 ++-- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index cf39ebf14..a71142bc5 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -36,8 +36,8 @@ -export([ '/bridges'/2, '/bridges/:id'/2, - '/bridges/:id/operation/:operation'/2, - '/nodes/:node/bridges/:id/operation/:operation'/2, + '/bridges/:id/:operation'/2, + '/nodes/:node/bridges/:id/:operation'/2, '/bridges/:id/reset_metrics'/2 ]). @@ -66,8 +66,8 @@ paths() -> [ "/bridges", "/bridges/:id", - "/bridges/:id/operation/:operation", - "/nodes/:node/bridges/:id/operation/:operation", + "/bridges/:id/:operation", + "/nodes/:node/bridges/:id/:operation", "/bridges/:id/reset_metrics" ]. @@ -348,9 +348,9 @@ schema("/bridges/:id/reset_metrics") -> } } }; -schema("/bridges/:id/operation/:operation") -> +schema("/bridges/:id/:operation") -> #{ - 'operationId' => '/bridges/:id/operation/:operation', + 'operationId' => '/bridges/:id/:operation', post => #{ tags => [<<"bridges">>], summary => <<"Enable/Disable/Stop/Restart Bridge">>, @@ -366,9 +366,9 @@ schema("/bridges/:id/operation/:operation") -> } } }; -schema("/nodes/:node/bridges/:id/operation/:operation") -> +schema("/nodes/:node/bridges/:id/:operation") -> #{ - 'operationId' => '/nodes/:node/bridges/:id/operation/:operation', + 'operationId' => '/nodes/:node/bridges/:id/:operation', post => #{ tags => [<<"bridges">>], summary => <<"Stop/Restart Bridge">>, @@ -485,7 +485,7 @@ lookup_from_local_node(BridgeType, BridgeName) -> Error -> Error end. -'/bridges/:id/operation/:operation'(post, #{ +'/bridges/:id/:operation'(post, #{ bindings := #{id := Id, operation := Op} }) -> @@ -513,7 +513,7 @@ lookup_from_local_node(BridgeType, BridgeName) -> end ). -'/nodes/:node/bridges/:id/operation/:operation'(post, #{ +'/nodes/:node/bridges/:id/:operation'(post, #{ bindings := #{id := Id, operation := Op, node := Node} }) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 4d16f1692..a6b5ece89 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -600,9 +600,9 @@ t_with_redact_update(_Config) -> ok. operation_path(node, Oper, BridgeID) -> - uri(["nodes", node(), "bridges", BridgeID, "operation", Oper]); + uri(["nodes", node(), "bridges", BridgeID, Oper]); operation_path(cluster, Oper, BridgeID) -> - uri(["bridges", BridgeID, "operation", Oper]). + uri(["bridges", BridgeID, Oper]). str(S) when is_list(S) -> S; str(S) when is_binary(S) -> binary_to_list(S). From 860e21d40f1b32eb4e3ac03d93679fbac6dd9e2b Mon Sep 17 00:00:00 2001 From: Erik Timan Date: Thu, 12 Jan 2023 10:39:19 +0100 Subject: [PATCH 2/9] feat(emqx_bridge): move metrics to own endpoint, rename reset_metrics In order for the /bridges APIs to be consistent with other APIs, we move out metrics from GET /bridges/{id} to its own endpoint, /bridges/{id}/metrics. We also rename /bridges/reset_metrics to /bridges/metrics/reset. --- apps/emqx_bridge/i18n/emqx_bridge_api.conf | 10 ++ apps/emqx_bridge/src/emqx_bridge_api.erl | 78 +++++++++++---- .../src/schema/emqx_bridge_mqtt_schema.erl | 2 +- .../src/schema/emqx_bridge_schema.erl | 21 ++-- .../src/schema/emqx_bridge_webhook_schema.erl | 2 +- .../test/emqx_bridge_api_SUITE.erl | 96 ++++++++++++++++--- .../src/emqx_ee_bridge_gcp_pubsub.erl | 2 +- .../src/emqx_ee_bridge_hstreamdb.erl | 2 +- .../src/emqx_ee_bridge_influxdb.erl | 2 +- .../src/emqx_ee_bridge_kafka.erl | 2 +- .../src/emqx_ee_bridge_mongodb.erl | 6 +- .../src/emqx_ee_bridge_mysql.erl | 2 +- .../src/emqx_ee_bridge_pgsql.erl | 2 +- .../src/emqx_ee_bridge_redis.erl | 2 +- 14 files changed, 176 insertions(+), 53 deletions(-) diff --git a/apps/emqx_bridge/i18n/emqx_bridge_api.conf b/apps/emqx_bridge/i18n/emqx_bridge_api.conf index e8bb2403a..796cbb91c 100644 --- a/apps/emqx_bridge/i18n/emqx_bridge_api.conf +++ b/apps/emqx_bridge/i18n/emqx_bridge_api.conf @@ -134,4 +134,14 @@ NOTE:不允许在单节点上启用/禁用 Bridge""" } } + desc_bridge_metrics { + desc { + en: """Get bridge metrics by Id""" + zh: """""" + } + label: { + en: "Get Bridge Metrics" + zh: "" + } + } } diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index a71142bc5..b05e31b11 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -38,7 +38,8 @@ '/bridges/:id'/2, '/bridges/:id/:operation'/2, '/nodes/:node/bridges/:id/:operation'/2, - '/bridges/:id/reset_metrics'/2 + '/bridges/:id/metrics'/2, + '/bridges/:id/metrics/reset'/2 ]). -export([lookup_from_local_node/2]). @@ -68,7 +69,8 @@ paths() -> "/bridges/:id", "/bridges/:id/:operation", "/nodes/:node/bridges/:id/:operation", - "/bridges/:id/reset_metrics" + "/bridges/:id/metrics", + "/bridges/:id/metrics/reset" ]. error_schema(Code, Message) when is_atom(Code) -> @@ -132,19 +134,22 @@ param_path_id() -> } )}. -bridge_info_array_example(Method) -> - [Config || #{value := Config} <- maps:values(bridge_info_examples(Method))]. +bridge_info_array_example(Method, WithMetrics) -> + [Config || #{value := Config} <- maps:values(bridge_info_examples(Method, WithMetrics))]. bridge_info_examples(Method) -> + bridge_info_examples(Method, false). + +bridge_info_examples(Method, WithMetrics) -> maps:merge( #{ <<"webhook_example">> => #{ summary => <<"WebHook">>, - value => info_example(webhook, Method) + value => info_example(webhook, Method, WithMetrics) }, <<"mqtt_example">> => #{ summary => <<"MQTT Bridge">>, - value => info_example(mqtt, Method) + value => info_example(mqtt, Method, WithMetrics) } }, ee_bridge_examples(Method) @@ -157,24 +162,24 @@ ee_bridge_examples(Method) -> _:_ -> #{} end. -info_example(Type, Method) -> +info_example(Type, Method, WithMetrics) -> maps:merge( info_example_basic(Type), - method_example(Type, Method) + method_example(Type, Method, WithMetrics) ). -method_example(Type, Method) when Method == get; Method == post -> +method_example(Type, Method, WithMetrics) when Method == get; Method == post -> SType = atom_to_list(Type), SName = SType ++ "_example", TypeNameExam = #{ type => bin(SType), name => bin(SName) }, - maybe_with_metrics_example(TypeNameExam, Method); -method_example(_Type, put) -> + maybe_with_metrics_example(TypeNameExam, Method, WithMetrics); +method_example(_Type, put, _WithMetrics) -> #{}. -maybe_with_metrics_example(TypeNameExam, get) -> +maybe_with_metrics_example(TypeNameExam, get, true) -> TypeNameExam#{ metrics => ?EMPTY_METRICS, node_metrics => [ @@ -184,7 +189,7 @@ maybe_with_metrics_example(TypeNameExam, get) -> } ] }; -maybe_with_metrics_example(TypeNameExam, _) -> +maybe_with_metrics_example(TypeNameExam, _, _) -> TypeNameExam. info_example_basic(webhook) -> @@ -274,7 +279,7 @@ schema("/bridges") -> responses => #{ 200 => emqx_dashboard_swagger:schema_with_example( array(emqx_bridge_schema:get_response()), - bridge_info_array_example(get) + bridge_info_array_example(get, true) ) } }, @@ -334,9 +339,23 @@ schema("/bridges/:id") -> } } }; -schema("/bridges/:id/reset_metrics") -> +schema("/bridges/:id/metrics") -> #{ - 'operationId' => '/bridges/:id/reset_metrics', + 'operationId' => '/bridges/:id/metrics', + get => #{ + tags => [<<"bridges">>], + summary => <<"Get Bridge Metrics">>, + description => ?DESC("desc_bridge_metrics"), + parameters => [param_path_id()], + responses => #{ + 200 => emqx_bridge_schema:metrics_fields(), + 404 => error_schema('NOT_FOUND', "Bridge not found") + } + } + }; +schema("/bridges/:id/metrics/reset") -> + #{ + 'operationId' => '/bridges/:id/metrics/reset', put => #{ tags => [<<"bridges">>], summary => <<"Reset Bridge Metrics">>, @@ -455,7 +474,10 @@ schema("/nodes/:node/bridges/:id/:operation") -> end ). -'/bridges/:id/reset_metrics'(put, #{bindings := #{id := Id}}) -> +'/bridges/:id/metrics'(get, #{bindings := #{id := Id}}) -> + ?TRY_PARSE_ID(Id, lookup_from_all_nodes_metrics(BridgeType, BridgeName, 200)). + +'/bridges/:id/metrics/reset'(put, #{bindings := #{id := Id}}) -> ?TRY_PARSE_ID( Id, case @@ -469,10 +491,18 @@ schema("/nodes/:node/bridges/:id/:operation") -> ). lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) -> + FormatFun = fun format_bridge_info_without_metrics/1, + do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun). + +lookup_from_all_nodes_metrics(BridgeType, BridgeName, SuccCode) -> + FormatFun = fun format_bridge_metrics/1, + do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun). + +do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun) -> Nodes = mria_mnesia:running_nodes(), case is_ok(emqx_bridge_proto_v1:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of {ok, [{ok, _} | _] = Results} -> - {SuccCode, format_bridge_info([R || {ok, R} <- Results])}; + {SuccCode, FormatFun([R || {ok, R} <- Results])}; {ok, [{error, not_found} | _]} -> {404, error_msg('NOT_FOUND', <<"not_found">>)}; {error, ErrL} -> @@ -572,7 +602,7 @@ zip_bridges([BridgesFirstNode | _] = BridgesAllNodes) -> lists:foldl( fun(#{type := Type, name := Name}, Acc) -> Bridges = pick_bridges_by_id(Type, Name, BridgesAllNodes), - [format_bridge_info(Bridges) | Acc] + [format_bridge_info_with_metrics(Bridges) | Acc] end, [], BridgesFirstNode @@ -606,7 +636,7 @@ pick_bridges_by_id(Type, Name, BridgesAllNodes) -> BridgesAllNodes ). -format_bridge_info([FirstBridge | _] = Bridges) -> +format_bridge_info_with_metrics([FirstBridge | _] = Bridges) -> Res = maps:remove(node, FirstBridge), NodeStatus = collect_status(Bridges), NodeMetrics = collect_metrics(Bridges), @@ -617,6 +647,14 @@ format_bridge_info([FirstBridge | _] = Bridges) -> node_metrics => NodeMetrics }). +format_bridge_info_without_metrics(Bridges) -> + Res = format_bridge_info_with_metrics(Bridges), + maps:without([metrics, node_metrics], Res). + +format_bridge_metrics(Bridges) -> + Res = format_bridge_info_with_metrics(Bridges), + maps:with([metrics, node_metrics], Res). + collect_status(Bridges) -> [maps:with([node, status], B) || B <- Bridges]. diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl index 4665a3bc5..5cd1693c7 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl @@ -51,7 +51,7 @@ fields("post") -> fields("put") -> fields("config"); fields("get") -> - emqx_bridge_schema:metrics_status_fields() ++ fields("config"). + emqx_bridge_schema:status_fields() ++ fields("config"). desc("config") -> ?DESC("config"); diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl index 845c1ef90..09a99488e 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl @@ -30,7 +30,8 @@ -export([ common_bridge_fields/0, - metrics_status_fields/0 + status_fields/0, + metrics_fields/0 ]). %%====================================================================================== @@ -83,19 +84,23 @@ common_bridge_fields() -> )} ]. -metrics_status_fields() -> +status_fields() -> + [ + {"status", mk(status(), #{desc => ?DESC("desc_status")})}, + {"node_status", + mk( + hoconsc:array(ref(?MODULE, "node_status")), + #{desc => ?DESC("desc_node_status")} + )} + ]. + +metrics_fields() -> [ {"metrics", mk(ref(?MODULE, "metrics"), #{desc => ?DESC("desc_metrics")})}, {"node_metrics", mk( hoconsc:array(ref(?MODULE, "node_metrics")), #{desc => ?DESC("desc_node_metrics")} - )}, - {"status", mk(status(), #{desc => ?DESC("desc_status")})}, - {"node_status", - mk( - hoconsc:array(ref(?MODULE, "node_status")), - #{desc => ?DESC("desc_node_status")} )} ]. diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl index 0495911e7..b495436a4 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl @@ -38,7 +38,7 @@ fields("post") -> fields("put") -> fields("config"); fields("get") -> - emqx_bridge_schema:metrics_status_fields() ++ fields("post"); + emqx_bridge_schema:status_fields() ++ fields("post"); fields("creation_opts") -> lists:filter( fun({K, _V}) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index a6b5ece89..eb86b923f 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -187,8 +187,6 @@ t_http_crud_apis(Config) -> <<"enable">> := true, <<"status">> := _, <<"node_status">> := [_ | _], - <<"metrics">> := _, - <<"node_metrics">> := [_ | _], <<"url">> := URL1 } = jsx:decode(Bridge), @@ -225,8 +223,6 @@ t_http_crud_apis(Config) -> <<"enable">> := true, <<"status">> := _, <<"node_status">> := [_ | _], - <<"metrics">> := _, - <<"node_metrics">> := [_ | _], <<"url">> := URL2 }, jsx:decode(Bridge2) @@ -259,8 +255,6 @@ t_http_crud_apis(Config) -> <<"enable">> := true, <<"status">> := _, <<"node_status">> := [_ | _], - <<"metrics">> := _, - <<"node_metrics">> := [_ | _], <<"url">> := URL2 }, jsx:decode(Bridge3Str) @@ -456,8 +450,6 @@ do_start_stop_bridges(Type, Config) -> <<"enable">> := true, <<"status">> := <<"connected">>, <<"node_status">> := [_ | _], - <<"metrics">> := _, - <<"node_metrics">> := [_ | _], <<"url">> := URL1 } = jsx:decode(Bridge), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), @@ -502,8 +494,6 @@ t_enable_disable_bridges(Config) -> <<"enable">> := true, <<"status">> := <<"connected">>, <<"node_status">> := [_ | _], - <<"metrics">> := _, - <<"node_metrics">> := [_ | _], <<"url">> := URL1 } = jsx:decode(Bridge), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), @@ -555,12 +545,10 @@ t_reset_bridges(Config) -> <<"enable">> := true, <<"status">> := <<"connected">>, <<"node_status">> := [_ | _], - <<"metrics">> := _, - <<"node_metrics">> := [_ | _], <<"url">> := URL1 } = jsx:decode(Bridge), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), - {ok, 200, <<"Reset success">>} = request(put, uri(["bridges", BridgeID, "reset_metrics"]), []), + {ok, 200, <<"Reset success">>} = request(put, uri(["bridges", BridgeID, "metrics/reset"]), []), %% delete the bridge {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), @@ -599,6 +587,88 @@ t_with_redact_update(_Config) -> ?assertEqual(Password, Value), ok. +t_metrics(Config) -> + Port = ?config(port, Config), + %% assert we there's no bridges at first + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + + %% then we add a webhook bridge, using POST + %% POST /bridges/ will create a bridge + URL1 = ?URL(Port, "path1"), + Name = ?BRIDGE_NAME, + {ok, 201, Bridge} = request( + post, + uri(["bridges"]), + ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name) + ), + + %ct:pal("---bridge: ~p", [Bridge]), + #{ + <<"type">> := ?BRIDGE_TYPE, + <<"name">> := Name, + <<"enable">> := true, + <<"status">> := _, + <<"node_status">> := [_ | _], + <<"url">> := URL1 + } = jsx:decode(Bridge), + + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), + + %% check for empty bridge metrics + {ok, 200, Bridge1Str} = request(get, uri(["bridges", BridgeID, "metrics"]), []), + ct:pal("HERE ~p", [jsx:decode(Bridge1Str)]), + ?assertMatch( + #{ + <<"metrics">> := #{<<"success">> := 0}, + <<"node_metrics">> := [_ | _] + }, + jsx:decode(Bridge1Str) + ), + + %% send an message to emqx and the message should be forwarded to the HTTP server + Body = <<"my msg">>, + emqx:publish(emqx_message:make(<<"emqx_webhook/1">>, Body)), + ?assert( + receive + {http_server, received, #{ + method := <<"POST">>, + path := <<"/path1">>, + body := Body + }} -> + true; + Msg -> + ct:pal("error: http got unexpected request: ~p", [Msg]), + false + after 100 -> + false + end + ), + + %% check for non-empty bridge metrics + {ok, 200, Bridge2Str} = request(get, uri(["bridges", BridgeID, "metrics"]), []), + ct:pal("HERE ~p", [jsx:decode(Bridge2Str)]), + ?assertMatch( + #{ + <<"metrics">> := #{<<"success">> := 1}, + <<"node_metrics">> := [_ | _] + }, + jsx:decode(Bridge2Str) + ), + + %% check for non-empty metrics when listing all bridges + {ok, 200, BridgesStr} = request(get, uri(["bridges"]), []), + ct:pal("HERE ~p", [jsx:decode(BridgesStr)]), + ?assertMatch( + [ + #{ + <<"metrics">> := #{<<"success">> := 1}, + <<"node_metrics">> := [_ | _] + } + ], + jsx:decode(BridgesStr) + ), + ok. + operation_path(node, Oper, BridgeID) -> uri(["nodes", node(), "bridges", BridgeID, Oper]); operation_path(cluster, Oper, BridgeID) -> diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl index 760aba9e1..83fe31b49 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl @@ -124,7 +124,7 @@ fields(bridge_config) -> )} ]; fields("get") -> - emqx_bridge_schema:metrics_status_fields() ++ fields("post"); + emqx_bridge_schema:status_fields() ++ fields("post"); fields("post") -> [type_field(), name_field() | fields("config")]; fields("put") -> diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstreamdb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstreamdb.erl index 135087929..6e0c711b2 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstreamdb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstreamdb.erl @@ -67,7 +67,7 @@ fields("post") -> fields("put") -> fields("config"); fields("get") -> - emqx_bridge_schema:metrics_status_fields() ++ fields("post"). + emqx_bridge_schema:status_fields() ++ fields("post"). field(connector) -> mk( diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl index 62c8b6ab7..fece72e82 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl @@ -139,7 +139,7 @@ method_fileds(get, ConnectorType) -> influxdb_bridge_common_fields() ++ connector_fields(ConnectorType) ++ type_name_fields(ConnectorType) ++ - emqx_bridge_schema:metrics_status_fields(); + emqx_bridge_schema:status_fields(); method_fileds(put, ConnectorType) -> influxdb_bridge_common_fields() ++ connector_fields(ConnectorType). diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl index 3f2f6a85f..9fae4f30a 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl @@ -67,7 +67,7 @@ fields("post") -> fields("put") -> fields("config"); fields("get") -> - emqx_bridge_schema:metrics_status_fields() ++ fields("post"); + emqx_bridge_schema:status_fields() ++ fields("post"); fields("config") -> [ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl index bb4082681..eaf2b7da1 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl @@ -59,15 +59,15 @@ fields("put_sharded") -> fields("put_single") -> fields(mongodb_single); fields("get_rs") -> - emqx_bridge_schema:metrics_status_fields() ++ + emqx_bridge_schema:status_fields() ++ fields(mongodb_rs) ++ type_and_name_fields(mongodb_rs); fields("get_sharded") -> - emqx_bridge_schema:metrics_status_fields() ++ + emqx_bridge_schema:status_fields() ++ fields(mongodb_sharded) ++ type_and_name_fields(mongodb_sharded); fields("get_single") -> - emqx_bridge_schema:metrics_status_fields() ++ + emqx_bridge_schema:status_fields() ++ fields(mongodb_single) ++ type_and_name_fields(mongodb_single). diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl index 114459149..71f8a8399 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl @@ -104,7 +104,7 @@ fields("post") -> fields("put") -> fields("config"); fields("get") -> - emqx_bridge_schema:metrics_status_fields() ++ fields("post"). + emqx_bridge_schema:status_fields() ++ fields("post"). desc("config") -> ?DESC("desc_config"); diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl index be9fc9dc8..7e21d4dd7 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl @@ -106,7 +106,7 @@ fields("post") -> fields("put") -> fields("config"); fields("get") -> - emqx_bridge_schema:metrics_status_fields() ++ fields("post"). + emqx_bridge_schema:status_fields() ++ fields("post"). fields("post", Type) -> [type_field(Type), name_field() | fields("config")]. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl index 5c273e050..3a3963786 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl @@ -126,7 +126,7 @@ method_fileds(get, ConnectorType) -> redis_bridge_common_fields() ++ connector_fields(ConnectorType) ++ type_name_fields(ConnectorType) ++ - emqx_bridge_schema:metrics_status_fields(); + emqx_bridge_schema:status_fields(); method_fileds(put, ConnectorType) -> redis_bridge_common_fields() ++ connector_fields(ConnectorType). From 42f42de4d9fad1666d1b76d758623d3f4dbc2edf Mon Sep 17 00:00:00 2001 From: Erik Timan Date: Thu, 12 Jan 2023 19:32:38 +0100 Subject: [PATCH 3/9] feat(emqx_bridge): add separate endpoint for enable/disable of bridge In order to improve the consistency with other API endpoints, we move the enable/disable operations to a separate endpoint /bridges/{id}/enable/[true,false]. --- apps/emqx_bridge/i18n/emqx_bridge_api.conf | 35 +++++++-- apps/emqx_bridge/src/emqx_bridge_api.erl | 75 ++++++++++++++----- .../test/emqx_bridge_api_SUITE.erl | 13 ++-- 3 files changed, 93 insertions(+), 30 deletions(-) diff --git a/apps/emqx_bridge/i18n/emqx_bridge_api.conf b/apps/emqx_bridge/i18n/emqx_bridge_api.conf index 796cbb91c..8adda9355 100644 --- a/apps/emqx_bridge/i18n/emqx_bridge_api.conf +++ b/apps/emqx_bridge/i18n/emqx_bridge_api.conf @@ -2,8 +2,8 @@ emqx_bridge_api { desc_param_path_operation_cluster { desc { - en: """Operations can be one of: enable, disable, start, stop, restart""" - zh: """集群可用操作:启用、禁用、启动、停止、重新启动""" + en: """Operations can be one of: start, stop, restart""" + zh: """""" } label: { en: "Cluster Operation" @@ -44,6 +44,16 @@ emqx_bridge_api { } } + desc_param_path_enable { + desc { + en: """Whether or not the bridge is enabled""" + zh: """""" + } + label: { + en: "Enable bridge" + zh: "" + } + } desc_api1 { desc { en: """List all created bridges""" @@ -112,8 +122,8 @@ emqx_bridge_api { desc_api7 { desc { - en: """Enable/Disable/Stop/Restart bridges on all nodes in the cluster.""" - zh: """在集群中的所有节点上启用/禁用/停止/重新启动 Bridge。""" + en: """Stop/Restart bridges on all nodes in the cluster.""" + zh: """""" } label: { en: "Cluster Bridge Operate" @@ -123,10 +133,8 @@ emqx_bridge_api { desc_api8 { desc { - en: """Stop/Restart bridges on a specific node. - NOTE: It's not allowed to disable/enable bridges on a single node.""" - zh: """在某个节点上停止/重新启动 Bridge。 -NOTE:不允许在单节点上启用/禁用 Bridge""" + en: """Stop/Restart bridges on a specific node.""" + zh: """在某个节点上停止/重新启动 Bridge。""" } label: { en: "Node Bridge Operate" @@ -144,4 +152,15 @@ NOTE:不允许在单节点上启用/禁用 Bridge""" zh: "" } } + + desc_enable_bridge { + desc { + en: """Enable or Disable bridges on all nodes in the cluster.""" + zh: """""" + } + label: { + en: "Cluster Bridge Enable" + zh: "" + } + } } diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index b05e31b11..f3247206e 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -36,6 +36,7 @@ -export([ '/bridges'/2, '/bridges/:id'/2, + '/bridges/:id/enable/:enable'/2, '/bridges/:id/:operation'/2, '/nodes/:node/bridges/:id/:operation'/2, '/bridges/:id/metrics'/2, @@ -67,6 +68,7 @@ paths() -> [ "/bridges", "/bridges/:id", + "/bridges/:id/enable/:enable", "/bridges/:id/:operation", "/nodes/:node/bridges/:id/:operation", "/bridges/:id/metrics", @@ -89,7 +91,7 @@ get_response_body_schema() -> param_path_operation_cluster() -> {operation, mk( - enum([enable, disable, stop, restart]), + enum([stop, restart]), #{ in => path, required => true, @@ -134,6 +136,17 @@ param_path_id() -> } )}. +param_path_enable() -> + {enable, + mk( + boolean(), + #{ + in => path, + desc => ?DESC("desc_param_path_enable"), + example => true + } + )}. + bridge_info_array_example(Method, WithMetrics) -> [Config || #{value := Config} <- maps:values(bridge_info_examples(Method, WithMetrics))]. @@ -367,12 +380,29 @@ schema("/bridges/:id/metrics/reset") -> } } }; +schema("/bridges/:id/enable/:enable") -> + #{ + 'operationId' => '/bridges/:id/enable/:enable', + put => + #{ + tags => [<<"bridges">>], + summary => <<"Enable or Disable Bridge">>, + desc => ?DESC("desc_enable_bridge"), + parameters => [param_path_id(), param_path_enable()], + responses => + #{ + 204 => <<"Success">>, + 400 => error_schema('INVALID_ID', "Bad bridge ID"), + 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") + } + } + }; schema("/bridges/:id/:operation") -> #{ 'operationId' => '/bridges/:id/:operation', post => #{ tags => [<<"bridges">>], - summary => <<"Enable/Disable/Stop/Restart Bridge">>, + summary => <<"Stop or Restart Bridge">>, description => ?DESC("desc_api7"), parameters => [ param_path_id(), @@ -515,6 +545,28 @@ lookup_from_local_node(BridgeType, BridgeName) -> Error -> Error end. +'/bridges/:id/enable/:enable'(put, #{bindings := #{id := Id, enable := Enable}}) -> + ?TRY_PARSE_ID( + Id, + case enable_func(Enable) of + invalid -> + {400, error_msg('BAD_REQUEST', <<"invalid operation">>)}; + OperFunc -> + case emqx_bridge:disable_enable(OperFunc, BridgeType, BridgeName) of + {ok, _} -> + {204}; + {error, {pre_config_update, _, bridge_not_found}} -> + {404, error_msg('NOT_FOUND', <<"bridge not found">>)}; + {error, {_, _, timeout}} -> + {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; + {error, timeout} -> + {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; + {error, Reason} -> + {500, error_msg('INTERNAL_ERROR', Reason)} + end + end + ). + '/bridges/:id/:operation'(post, #{ bindings := #{id := Id, operation := Op} @@ -524,19 +576,6 @@ lookup_from_local_node(BridgeType, BridgeName) -> case operation_func(Op) of invalid -> {400, error_msg('BAD_REQUEST', <<"invalid operation">>)}; - OperFunc when OperFunc == enable; OperFunc == disable -> - case emqx_bridge:disable_enable(OperFunc, BridgeType, BridgeName) of - {ok, _} -> - {200}; - {error, {pre_config_update, _, bridge_not_found}} -> - {404, error_msg('NOT_FOUND', <<"bridge not found">>)}; - {error, {_, _, timeout}} -> - {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; - {error, timeout} -> - {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; - {error, Reason} -> - {500, error_msg('INTERNAL_ERROR', Reason)} - end; OperFunc -> Nodes = mria_mnesia:running_nodes(), operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) @@ -573,10 +612,12 @@ node_operation_func(_) -> invalid. operation_func(<<"stop">>) -> stop; operation_func(<<"restart">>) -> restart; -operation_func(<<"enable">>) -> enable; -operation_func(<<"disable">>) -> disable; operation_func(_) -> invalid. +enable_func(<<"true">>) -> enable; +enable_func(<<"false">>) -> disable; +enable_func(_) -> invalid. + operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) -> RpcFunc = case OperFunc of diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index eb86b923f..4650ea1ad 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -498,19 +498,19 @@ t_enable_disable_bridges(Config) -> } = jsx:decode(Bridge), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), %% disable it - {ok, 200, <<>>} = request(post, operation_path(cluster, disable, BridgeID), <<"">>), + {ok, 204, <<>>} = request(put, enable_path(false, BridgeID), <<"">>), {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []), ?assertMatch(#{<<"status">> := <<"stopped">>}, jsx:decode(Bridge2)), %% enable again - {ok, 200, <<>>} = request(post, operation_path(cluster, enable, BridgeID), <<"">>), + {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), <<"">>), {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)), %% enable an already started bridge - {ok, 200, <<>>} = request(post, operation_path(cluster, enable, BridgeID), <<"">>), + {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), <<"">>), {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)), %% disable it again - {ok, 200, <<>>} = request(post, operation_path(cluster, disable, BridgeID), <<"">>), + {ok, 204, <<>>} = request(put, enable_path(false, BridgeID), <<"">>), {ok, 403, Res} = request(post, operation_path(node, restart, BridgeID), <<"">>), ?assertEqual( @@ -519,7 +519,7 @@ t_enable_disable_bridges(Config) -> ), %% enable a stopped bridge - {ok, 200, <<>>} = request(post, operation_path(cluster, enable, BridgeID), <<"">>), + {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), <<"">>), {ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []), ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge4)), %% delete the bridge @@ -674,5 +674,8 @@ operation_path(node, Oper, BridgeID) -> operation_path(cluster, Oper, BridgeID) -> uri(["bridges", BridgeID, Oper]). +enable_path(Enable, BridgeID) -> + uri(["bridges", BridgeID, "enable", Enable]). + str(S) when is_list(S) -> S; str(S) when is_binary(S) -> binary_to_list(S). From 0fd6865c41c8d66d5184c238eebf945b339b2c4f Mon Sep 17 00:00:00 2001 From: Erik Timan Date: Thu, 12 Jan 2023 19:51:25 +0100 Subject: [PATCH 4/9] chore: add changes --- changes/v5.0.15/feat-9736.en.md | 5 +++++ changes/v5.0.15/feat-9736.zh.md | 5 +++++ 2 files changed, 10 insertions(+) create mode 100644 changes/v5.0.15/feat-9736.en.md create mode 100644 changes/v5.0.15/feat-9736.zh.md diff --git a/changes/v5.0.15/feat-9736.en.md b/changes/v5.0.15/feat-9736.en.md new file mode 100644 index 000000000..59d7bd558 --- /dev/null +++ b/changes/v5.0.15/feat-9736.en.md @@ -0,0 +1,5 @@ +Refactor of /bridges API to make it more consistent with other APIs: +- bridge enable/disable is now done via the endpoint `/bridges/{id}/enable/[true,false]` +- `/bridges/{id}/operation/{operation}` endpoints are now `/bridges/{id}/{operation}` +- metrics are moved out from the GET `/bridges/{id}` response and can now be fetched via `/bridges/{id}/metrics` +- the `bridges/{id}/reset_metrics` endpoint is now `/bridges/{id}/metrics/reset` diff --git a/changes/v5.0.15/feat-9736.zh.md b/changes/v5.0.15/feat-9736.zh.md new file mode 100644 index 000000000..0107c8ab6 --- /dev/null +++ b/changes/v5.0.15/feat-9736.zh.md @@ -0,0 +1,5 @@ +重构部分 /bridges 的API 使得其和其他 API 能够更加一致: +- 桥接的启用和禁用现在是通过 `/bridges/{id}/enable/[true,false]` API 来实现的 +- 使用 `/bridges/{id}/{operation}` 替换了旧的 `/bridges/{id}/operation/{operation}` API +- 指标数据从 `/bridges/{id}` 的响应消息中移除,现在可以使用新的 API `/bridges/{id}/metrics` 进行访问 +- 使用 `/bridges/{id}/metrics/reset` 替换了旧的 `bridges/{id}/reset_metrics` API From f1c58c34ed124f597fdfbcf6d314233ba34a2262 Mon Sep 17 00:00:00 2001 From: Erik Timan Date: Fri, 13 Jan 2023 09:57:33 +0100 Subject: [PATCH 5/9] test(emqx_bridge): fix fetching of metrics in emqx_bridge_mqtt_SUITE --- .../test/emqx_bridge_mqtt_SUITE.erl | 38 ++++++++++--------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index ae4fc4692..3040789b3 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -187,7 +187,7 @@ t_mqtt_conn_bridge_ingress(_) -> ), %% verify the metrics of the bridge - {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDIngress]), []), + {ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDIngress, "metrics"]), []), ?assertMatch( #{ <<"metrics">> := #{<<"matched">> := 0, <<"received">> := 1}, @@ -200,7 +200,7 @@ t_mqtt_conn_bridge_ingress(_) -> } ] }, - jsx:decode(BridgeStr) + jsx:decode(BridgeMetricsStr) ), %% delete the bridge @@ -255,7 +255,7 @@ t_mqtt_conn_bridge_egress(_) -> ), %% verify the metrics of the bridge - {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), + {ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []), ?assertMatch( #{ <<"metrics">> := #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0}, @@ -268,7 +268,7 @@ t_mqtt_conn_bridge_egress(_) -> } ] }, - jsx:decode(BridgeStr) + jsx:decode(BridgeMetricsStr) ), %% delete the bridge @@ -354,7 +354,7 @@ t_mqtt_conn_bridge_ingress_and_egress(_) -> Payload = <<"hello">>, emqx:subscribe(RemoteTopic), - {ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []), + {ok, 200, BridgeMetricsStr1} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []), #{ <<"metrics">> := #{ <<"matched">> := CntMatched1, <<"success">> := CntSuccess1, <<"failed">> := 0 @@ -371,7 +371,7 @@ t_mqtt_conn_bridge_ingress_and_egress(_) -> } } ] - } = jsx:decode(BridgeStr1), + } = jsx:decode(BridgeMetricsStr1), timer:sleep(100), %% PUBLISH a message to the 'local' broker, as we have only one broker, %% the remote broker is also the local one. @@ -393,7 +393,7 @@ t_mqtt_conn_bridge_ingress_and_egress(_) -> %% verify the metrics of the bridge timer:sleep(1000), - {ok, 200, BridgeStr2} = request(get, uri(["bridges", BridgeIDEgress]), []), + {ok, 200, BridgeMetricsStr2} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []), #{ <<"metrics">> := #{ <<"matched">> := CntMatched2, <<"success">> := CntSuccess2, <<"failed">> := 0 @@ -410,7 +410,7 @@ t_mqtt_conn_bridge_ingress_and_egress(_) -> } } ] - } = jsx:decode(BridgeStr2), + } = jsx:decode(BridgeMetricsStr2), ?assertEqual(CntMatched2, CntMatched1 + 1), ?assertEqual(CntSuccess2, CntSuccess1 + 1), ?assertEqual(NodeCntMatched2, NodeCntMatched1 + 1), @@ -513,7 +513,7 @@ t_ingress_mqtt_bridge_with_rules(_) -> ), %% verify the metrics of the bridge - {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDIngress]), []), + {ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDIngress, "metrics"]), []), ?assertMatch( #{ <<"metrics">> := #{<<"matched">> := 0, <<"received">> := 1}, @@ -526,7 +526,7 @@ t_ingress_mqtt_bridge_with_rules(_) -> } ] }, - jsx:decode(BridgeStr) + jsx:decode(BridgeMetricsStr) ), {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), @@ -627,7 +627,7 @@ t_egress_mqtt_bridge_with_rules(_) -> ), %% verify the metrics of the bridge - {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), + {ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []), ?assertMatch( #{ <<"metrics">> := #{<<"matched">> := 2, <<"success">> := 2, <<"failed">> := 0}, @@ -641,7 +641,7 @@ t_egress_mqtt_bridge_with_rules(_) -> } ] }, - jsx:decode(BridgeStr) + jsx:decode(BridgeMetricsStr) ), {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), @@ -693,7 +693,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> assert_mqtt_msg_received(RemoteTopic, Payload0), %% verify the metrics of the bridge - {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), + {ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []), ?assertMatch( #{ <<"metrics">> := #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0}, @@ -706,7 +706,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> } ] }, - jsx:decode(BridgeStr) + jsx:decode(BridgeMetricsStr) ), %% stop the listener 1883 to make the bridge disconnected @@ -740,7 +740,9 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> %% verify the metrics of the bridge, the message should be queued {ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []), + {ok, 200, BridgeMetricsStr1} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []), Decoded1 = jsx:decode(BridgeStr1), + DecodedMetrics1 = jsx:decode(BridgeMetricsStr1), ?assertMatch( Status when (Status == <<"connected">> orelse Status == <<"connecting">>), maps:get(<<"status">>, Decoded1) @@ -753,7 +755,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> <<"failed">> := 0, <<"queuing">> := 2 } when Matched >= 3, - maps:get(<<"metrics">>, Decoded1) + maps:get(<<"metrics">>, DecodedMetrics1) ), %% start the listener 1883 to make the bridge reconnected @@ -761,10 +763,12 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> timer:sleep(1500), %% verify the metrics of the bridge, the 2 queued messages should have been sent {ok, 200, BridgeStr2} = request(get, uri(["bridges", BridgeIDEgress]), []), + {ok, 200, BridgeMetricsStr2} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []), + Decoded2 = jsx:decode(BridgeStr2), + ?assertEqual(<<"connected">>, maps:get(<<"status">>, Decoded2)), %% matched >= 3 because of possible retries. ?assertMatch( #{ - <<"status">> := <<"connected">>, <<"metrics">> := #{ <<"matched">> := Matched, <<"success">> := 3, @@ -773,7 +777,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> <<"retried">> := _ } } when Matched >= 3, - jsx:decode(BridgeStr2) + jsx:decode(BridgeMetricsStr2) ), %% also verify the 2 messages have been sent to the remote broker assert_mqtt_msg_received(RemoteTopic, Payload1), From 8dd52e5a6e92ce46790f45ca713c4b698a414671 Mon Sep 17 00:00:00 2001 From: Erik Timan Date: Fri, 13 Jan 2023 10:16:27 +0100 Subject: [PATCH 6/9] chore: add translations to schemas --- apps/emqx_bridge/i18n/emqx_bridge_api.conf | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/apps/emqx_bridge/i18n/emqx_bridge_api.conf b/apps/emqx_bridge/i18n/emqx_bridge_api.conf index 8adda9355..9c9200aeb 100644 --- a/apps/emqx_bridge/i18n/emqx_bridge_api.conf +++ b/apps/emqx_bridge/i18n/emqx_bridge_api.conf @@ -3,7 +3,7 @@ emqx_bridge_api { desc_param_path_operation_cluster { desc { en: """Operations can be one of: start, stop, restart""" - zh: """""" + zh: """集群可用操作:启用、启动、重新启动""" } label: { en: "Cluster Operation" @@ -47,11 +47,11 @@ emqx_bridge_api { desc_param_path_enable { desc { en: """Whether or not the bridge is enabled""" - zh: """""" + zh: """是否启用桥接""" } label: { en: "Enable bridge" - zh: "" + zh: "启用桥接" } } desc_api1 { @@ -123,7 +123,7 @@ emqx_bridge_api { desc_api7 { desc { en: """Stop/Restart bridges on all nodes in the cluster.""" - zh: """""" + zh: """停止或启用所有节点上的桥接""" } label: { en: "Cluster Bridge Operate" @@ -145,22 +145,22 @@ emqx_bridge_api { desc_bridge_metrics { desc { en: """Get bridge metrics by Id""" - zh: """""" + zh: """通过 Id 来获取桥接的指标信息""" } label: { en: "Get Bridge Metrics" - zh: "" + zh: "获取桥接的指标" } } desc_enable_bridge { desc { en: """Enable or Disable bridges on all nodes in the cluster.""" - zh: """""" + zh: """启用或禁用所有节点上的桥接""" } label: { en: "Cluster Bridge Enable" - zh: "" + zh: "是否启用集群内的桥接" } } } From 7a17fb7308d80498af53926f9ee118b9ce89fa23 Mon Sep 17 00:00:00 2001 From: Erik Timan Date: Fri, 13 Jan 2023 13:50:37 +0100 Subject: [PATCH 7/9] test(emqx_ee_bridge): fix bridge enable/disable in kafka producer suite --- .../emqx_bridge_impl_kafka_producer_SUITE.erl | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index bdde21c76..8e5b1fa95 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -220,9 +220,10 @@ kafka_bridge_rest_api_helper(Config) -> BridgeIdUrlEnc = BridgeType ++ UrlEscColon ++ BridgeName, BridgesParts = ["bridges"], BridgesPartsIdDeleteAlsoActions = ["bridges", BridgeIdUrlEnc ++ "?also_delete_dep_actions"], - OpUrlFun = fun(OpName) -> ["bridges", BridgeIdUrlEnc, "operation", OpName] end, - BridgesPartsOpDisable = OpUrlFun("disable"), - BridgesPartsOpEnable = OpUrlFun("enable"), + OpUrlFun = fun(OpName) -> ["bridges", BridgeIdUrlEnc, OpName] end, + EnableFun = fun(Enable) -> ["bridges", BridgeIdUrlEnc, "enable", Enable] end, + BridgesPartsOpDisable = EnableFun("false"), + BridgesPartsOpEnable = EnableFun("true"), BridgesPartsOpRestart = OpUrlFun("restart"), BridgesPartsOpStop = OpUrlFun("stop"), %% List bridges @@ -321,10 +322,10 @@ kafka_bridge_rest_api_helper(Config) -> ?assertEqual(0, emqx_resource_metrics:retried_failed_get(ResourceId)), ?assertEqual(0, emqx_resource_metrics:retried_success_get(ResourceId)), %% Perform operations - {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})), - {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})), - {ok, 200, _} = show(http_post(show(BridgesPartsOpEnable), #{})), - {ok, 200, _} = show(http_post(show(BridgesPartsOpEnable), #{})), + {ok, 204, _} = show(http_put(show(BridgesPartsOpDisable), #{})), + {ok, 204, _} = show(http_put(show(BridgesPartsOpDisable), #{})), + {ok, 204, _} = show(http_put(show(BridgesPartsOpEnable), #{})), + {ok, 204, _} = show(http_put(show(BridgesPartsOpEnable), #{})), {ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})), {ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})), {ok, 200, _} = show(http_post(show(BridgesPartsOpRestart), #{})), From c3133fb6a22d50bf569946f7e3790b80af697f37 Mon Sep 17 00:00:00 2001 From: Erik Timan Date: Fri, 13 Jan 2023 14:49:41 +0100 Subject: [PATCH 8/9] fix(emqx_bridge): small fixes from review --- apps/emqx_bridge/i18n/emqx_bridge_api.conf | 4 ++-- apps/emqx_bridge/src/emqx_bridge_api.erl | 3 ++- apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl | 13 ++++++++----- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/apps/emqx_bridge/i18n/emqx_bridge_api.conf b/apps/emqx_bridge/i18n/emqx_bridge_api.conf index 9c9200aeb..a5593f1cf 100644 --- a/apps/emqx_bridge/i18n/emqx_bridge_api.conf +++ b/apps/emqx_bridge/i18n/emqx_bridge_api.conf @@ -2,8 +2,8 @@ emqx_bridge_api { desc_param_path_operation_cluster { desc { - en: """Operations can be one of: start, stop, restart""" - zh: """集群可用操作:启用、启动、重新启动""" + en: """Operations can be one of: stop, restart""" + zh: """集群可用操作:停止、重新启动""" } label: { en: "Cluster Operation" diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index f3247206e..3af911d3d 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -107,7 +107,7 @@ param_path_operation_on_node() -> #{ in => path, required => true, - example => <<"start">>, + example => <<"stop">>, desc => ?DESC("desc_param_path_operation_on_node") } )}. @@ -142,6 +142,7 @@ param_path_enable() -> boolean(), #{ in => path, + required => true, desc => ?DESC("desc_param_path_enable"), example => true } diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 4650ea1ad..82523a839 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -616,7 +616,6 @@ t_metrics(Config) -> %% check for empty bridge metrics {ok, 200, Bridge1Str} = request(get, uri(["bridges", BridgeID, "metrics"]), []), - ct:pal("HERE ~p", [jsx:decode(Bridge1Str)]), ?assertMatch( #{ <<"metrics">> := #{<<"success">> := 0}, @@ -625,6 +624,12 @@ t_metrics(Config) -> jsx:decode(Bridge1Str) ), + %% check that the bridge doesn't contain metrics anymore + {ok, 200, Bridge2Str} = request(get, uri(["bridges", BridgeID]), []), + Decoded = jsx:decode(Bridge2Str), + ?assertNot(maps:is_key(<<"metrics">>, Decoded)), + ?assertNot(maps:is_key(<<"node_metrics">>, Decoded)), + %% send an message to emqx and the message should be forwarded to the HTTP server Body = <<"my msg">>, emqx:publish(emqx_message:make(<<"emqx_webhook/1">>, Body)), @@ -645,19 +650,17 @@ t_metrics(Config) -> ), %% check for non-empty bridge metrics - {ok, 200, Bridge2Str} = request(get, uri(["bridges", BridgeID, "metrics"]), []), - ct:pal("HERE ~p", [jsx:decode(Bridge2Str)]), + {ok, 200, Bridge3Str} = request(get, uri(["bridges", BridgeID, "metrics"]), []), ?assertMatch( #{ <<"metrics">> := #{<<"success">> := 1}, <<"node_metrics">> := [_ | _] }, - jsx:decode(Bridge2Str) + jsx:decode(Bridge3Str) ), %% check for non-empty metrics when listing all bridges {ok, 200, BridgesStr} = request(get, uri(["bridges"]), []), - ct:pal("HERE ~p", [jsx:decode(BridgesStr)]), ?assertMatch( [ #{ From 61e98900be2841f03152ddc29416271bb77e314a Mon Sep 17 00:00:00 2001 From: Erik Timan Date: Fri, 13 Jan 2023 15:13:35 +0100 Subject: [PATCH 9/9] chore: bump app vsn of emqx_resource --- apps/emqx_resource/src/emqx_resource.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src index 00389261b..e3a37fd10 100644 --- a/apps/emqx_resource/src/emqx_resource.app.src +++ b/apps/emqx_resource/src/emqx_resource.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_resource, [ {description, "Manager for all external resources"}, - {vsn, "0.1.4"}, + {vsn, "0.1.5"}, {registered, []}, {mod, {emqx_resource_app, []}}, {applications, [