From 903ae9a6446e2643f9b8ced9de72220af5bba525 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Wed, 21 Dec 2022 10:07:18 +0100 Subject: [PATCH 01/22] style: fix typo in fun name --- apps/emqx/src/emqx_map_lib.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx/src/emqx_map_lib.erl b/apps/emqx/src/emqx_map_lib.erl index 5455fe9e7..631c3914d 100644 --- a/apps/emqx/src/emqx_map_lib.erl +++ b/apps/emqx/src/emqx_map_lib.erl @@ -152,7 +152,7 @@ deep_convert(Val, _, _Args) -> -spec unsafe_atom_key_map(#{binary() | atom() => any()}) -> #{atom() => any()}. unsafe_atom_key_map(Map) -> - covert_keys_to_atom(Map, fun(K) -> binary_to_atom(K, utf8) end). + convert_keys_to_atom(Map, fun(K) -> binary_to_atom(K, utf8) end). -spec binary_key_map(map()) -> map(). binary_key_map(Map) -> @@ -167,7 +167,7 @@ binary_key_map(Map) -> -spec safe_atom_key_map(#{binary() | atom() => any()}) -> #{atom() => any()}. safe_atom_key_map(Map) -> - covert_keys_to_atom(Map, fun(K) -> binary_to_existing_atom(K, utf8) end). + convert_keys_to_atom(Map, fun(K) -> binary_to_existing_atom(K, utf8) end). -spec jsonable_map(map() | list()) -> map() | list(). jsonable_map(Map) -> @@ -221,7 +221,7 @@ binary_string(Val) -> Val. %%--------------------------------------------------------------------------- -covert_keys_to_atom(BinKeyMap, Conv) -> +convert_keys_to_atom(BinKeyMap, Conv) -> deep_convert( BinKeyMap, fun From 96ca0d9f499b591152241921e69feb1dd524dade Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Tue, 20 Dec 2022 16:12:13 +0100 Subject: [PATCH 02/22] feat(emqx_bridge): add /bridges_probe API endpoint --- apps/emqx_bridge/i18n/emqx_bridge_api.conf | 16 ++++ apps/emqx_bridge/src/emqx_bridge_api.erl | 76 ++++++++++++++++++- apps/emqx_bridge/src/emqx_bridge_resource.erl | 6 +- .../test/emqx_bridge_api_SUITE.erl | 46 +++++++++++ 4 files changed, 140 insertions(+), 4 deletions(-) diff --git a/apps/emqx_bridge/i18n/emqx_bridge_api.conf b/apps/emqx_bridge/i18n/emqx_bridge_api.conf index e8bb2403a..28f3db324 100644 --- a/apps/emqx_bridge/i18n/emqx_bridge_api.conf +++ b/apps/emqx_bridge/i18n/emqx_bridge_api.conf @@ -134,4 +134,20 @@ NOTE:不允许在单节点上启用/禁用 Bridge""" } } + desc_api9 { + desc { + en: """ +Test creating a new bridge by given ID
+The ID must be of format '{type}:{name}' +""" + zh: """ +通过给定的 ID 测试创建一个新的桥接。
+ID 的格式必须为 ’{type}:{name}” +""" + } + label: { + en: "Test Bridge Creation" + zh: "测试桥接创建" + } + } } diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 0d0d2e9ad..aa11ebc5d 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -38,7 +38,8 @@ '/bridges/:id'/2, '/bridges/:id/operation/:operation'/2, '/nodes/:node/bridges/:id/operation/:operation'/2, - '/bridges/:id/reset_metrics'/2 + '/bridges/:id/reset_metrics'/2, + '/bridges_probe'/2 ]). -export([lookup_from_local_node/2]). @@ -68,7 +69,8 @@ paths() -> "/bridges/:id", "/bridges/:id/operation/:operation", "/nodes/:node/bridges/:id/operation/:operation", - "/bridges/:id/reset_metrics" + "/bridges/:id/reset_metrics", + "/bridges_probe" ]. error_schema(Code, Message) when is_atom(Code) -> @@ -384,6 +386,23 @@ schema("/nodes/:node/bridges/:id/operation/:operation") -> 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") } } + }; +schema("/bridges_probe") -> + #{ + 'operationId' => '/bridges_probe', + post => #{ + tags => [<<"bridges">>], + desc => ?DESC("desc_api9"), + summary => <<"Test creating bridge">>, + 'requestBody' => emqx_dashboard_swagger:schema_with_examples( + emqx_bridge_schema:post_request(), + bridge_info_examples(post) + ), + responses => #{ + 204 => <<"Test bridge OK">>, + 400 => error_schema(['TEST_FAILED'], "bridge test failed") + } + } }. '/bridges'(post, #{body := #{<<"type">> := BridgeType, <<"name">> := BridgeName} = Conf0}) -> @@ -462,6 +481,59 @@ schema("/nodes/:node/bridges/:id/operation/:operation") -> end ). +'/bridges_probe'(post, Request) -> + RequestMeta = #{module => ?MODULE, method => post, path => "/bridges_probe"}, + case emqx_dashboard_swagger:filter_check_request_and_translate_body(Request, RequestMeta) of + {ok, #{body := #{<<"type">> := ConnType} = Params}} -> + case do_probe(ConnType, maps:remove(<<"type">>, Params)) of + ok -> + {204}; + {error, Error} -> + {400, error_msg('TEST_FAILED', Error)} + end; + BadRequest -> + BadRequest + end. + +do_probe(ConnType, Params) -> + case test_connection(host_and_port(ConnType, Params)) of + ok -> + emqx_bridge_resource:create_dry_run(ConnType, Params); + Error -> + Error + end. + +host_and_port(mqtt, #{<<"server">> := Server}) -> + Server; +host_and_port(webhook, #{<<"url">> := Url}) -> + {BaseUrl, _Path} = parse_url(Url), + {ok, #{host := Host, port := Port}} = emqx_http_lib:uri_parse(BaseUrl), + {Host, Port}; +host_and_port(_Unknown, _) -> + undefined. + +test_connection(undefined) -> + %% be friendly, it might fail later on with a 'timeout' error. + ok; +test_connection({Host, Port}) -> + case gen_tcp:connect(Host, Port, []) of + {ok, TestSocket} -> gen_tcp:close(TestSocket); + Error -> Error + end. + +parse_url(Url) -> + case string:split(Url, "//", leading) of + [Scheme, UrlRem] -> + case string:split(UrlRem, "/", leading) of + [HostPort, Path] -> + {iolist_to_binary([Scheme, "//", HostPort]), Path}; + [HostPort] -> + {iolist_to_binary([Scheme, "//", HostPort]), <<>>} + end; + [Url] -> + error({invalid_url, Url}) + end. + lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) -> Nodes = mria_mnesia:running_nodes(), case is_ok(emqx_bridge_proto_v1:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index d1ce260c9..dce7b9f1a 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -213,14 +213,16 @@ recreate(Type, Name, Conf, Opts) -> Opts ). -create_dry_run(Type, Conf) -> +create_dry_run(Type, Conf0) -> TmpPath = iolist_to_binary(["bridges-create-dry-run:", emqx_misc:gen_id(8)]), + Conf = emqx_map_lib:safe_atom_key_map(Conf0), case emqx_connector_ssl:convert_certs(TmpPath, Conf) of {error, Reason} -> {error, Reason}; {ok, ConfNew} -> + ParseConf = parse_confs(bin(Type), TmpPath, ConfNew), Res = emqx_resource:create_dry_run_local( - bridge_to_resource_type(Type), ConfNew + bridge_to_resource_type(Type), ParseConf ), _ = maybe_clear_certs(TmpPath, ConfNew), Res diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index e533c78b0..a32019e41 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -596,6 +596,52 @@ t_with_redact_update(_Config) -> ?assertEqual(Password, Value), ok. +-define(MQTT_BRIDGE(Server), #{ + <<"server">> => Server, + <<"username">> => <<"user1">>, + <<"password">> => <<"">>, + <<"proto_ver">> => <<"v5">>, + <<"ssl">> => #{<<"enable">> => false}, + <<"type">> => <<"mqtt">>, + <<"name">> => <<"mqtt_egress_test_bridge">> +}). + +t_bridges_probe(Config) -> + Port = ?config(port, Config), + URL = ?URL(Port, "some_path"), + + {ok, 204, <<>>} = request( + post, + uri(["bridges_probe"]), + ?HTTP_BRIDGE(URL, ?BRIDGE_TYPE, ?BRIDGE_NAME) + ), + + %% second time with same name is ok since no real bridge created + {ok, 204, <<>>} = request( + post, + uri(["bridges_probe"]), + ?HTTP_BRIDGE(URL, ?BRIDGE_TYPE, ?BRIDGE_NAME) + ), + + {ok, 400, _} = request( + post, + uri(["bridges_probe"]), + ?HTTP_BRIDGE(<<"http://203.0.113.3:1234/foo">>, ?BRIDGE_TYPE, ?BRIDGE_NAME) + ), + + {ok, 204, _} = request( + post, + uri(["bridges_probe"]), + ?MQTT_BRIDGE(<<"127.0.0.1:1883">>) + ), + + {ok, 400, _} = request( + post, + uri(["bridges_probe"]), + ?MQTT_BRIDGE(<<"127.0.0.1:2883">>) + ), + ok. + request(Method, Url, Body) -> request(<<"bridge_admin">>, Method, Url, Body). From c42c99f94f53785ae424709a99cac7e347f8b621 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Wed, 21 Dec 2022 15:20:53 +0100 Subject: [PATCH 03/22] fix: set a timeout for tcp connect --- apps/emqx_bridge/src/emqx_bridge_api.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index aa11ebc5d..8696aadaa 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -512,11 +512,12 @@ host_and_port(webhook, #{<<"url">> := Url}) -> host_and_port(_Unknown, _) -> undefined. +%% [TODO] remove in EMQX-8588 when resource manager handles things more elegantly test_connection(undefined) -> %% be friendly, it might fail later on with a 'timeout' error. ok; test_connection({Host, Port}) -> - case gen_tcp:connect(Host, Port, []) of + case gen_tcp:connect(Host, Port, [], 5000) of {ok, TestSocket} -> gen_tcp:close(TestSocket); Error -> Error end. From d55404cc99d9383a6b86ada419fe5280a0ddc67c Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Wed, 21 Dec 2022 15:23:46 +0100 Subject: [PATCH 04/22] chore: add changelog --- changes/v5.0.13-en.md | 2 ++ changes/v5.0.13-zh.md | 2 ++ 2 files changed, 4 insertions(+) diff --git a/changes/v5.0.13-en.md b/changes/v5.0.13-en.md index 1ed3c798b..669275958 100644 --- a/changes/v5.0.13-en.md +++ b/changes/v5.0.13-en.md @@ -18,6 +18,8 @@ - Ensure the default expiration time of `banned` is large enough [#9599](https://github.com/emqx/emqx/pull/9599/). +- `/bridges_probe` API endpoint to test params for creating a new data bridge [#9585](https://github.com/emqx/emqx/pull/9585). + ## Bug fixes - Trigger `message.dropped` hook when QoS2 message is resend by client with a same packet id, or 'awaiting_rel' queue is full [#9487](https://github.com/emqx/emqx/pull/9487). diff --git a/changes/v5.0.13-zh.md b/changes/v5.0.13-zh.md index 41a1228f4..aae629a4a 100644 --- a/changes/v5.0.13-zh.md +++ b/changes/v5.0.13-zh.md @@ -18,6 +18,8 @@ - 确保黑名单的默认超期时间足够长 [#9599](https://github.com/emqx/emqx/pull/9599/)。 +- [FIXME] `/bridges_probe` API 端点用于测试创建新数据桥的参数 [#9585](https://github.com/emqx/emqx/pull/9585)。 + ## 修复 - 当 QoS2 消息被重发(使用相同 Packet ID),或当 'awaiting_rel' 队列已满时,触发消息丢弃钩子(`message.dropped`)及计数器 [#9487](https://github.com/emqx/emqx/pull/9487)。 From c85a988a4332b5fb1b1c0ad9684c8706901b6960 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Mon, 9 Jan 2023 12:02:38 +0100 Subject: [PATCH 05/22] fix: split 'server' param into host and port for mqtt bridge --- apps/emqx_bridge/src/emqx_bridge_api.erl | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 8696aadaa..851089acb 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -504,7 +504,10 @@ do_probe(ConnType, Params) -> end. host_and_port(mqtt, #{<<"server">> := Server}) -> - Server; + case string:split(Server, ":") of + [Host, Port] -> {Host, list_to_integer(Port)}; + _Other -> error(invalid_server, Server) + end; host_and_port(webhook, #{<<"url">> := Url}) -> {BaseUrl, _Path} = parse_url(Url), {ok, #{host := Host, port := Port}} = emqx_http_lib:uri_parse(BaseUrl), From f27f573109284d690082ed3fad062c52182b3a0a Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Fri, 16 Dec 2022 14:46:13 +0100 Subject: [PATCH 06/22] refactor: move to /authorization/sources/built_in_database/rules --- .github/workflows/run_jmeter_tests.yaml | 10 +-- .../i18n/emqx_authz_api_mnesia_i18n.conf | 50 +++++++-------- apps/emqx_authz/src/emqx_authz.app.src | 2 +- apps/emqx_authz/src/emqx_authz_api_mnesia.erl | 54 ++++++++++------ .../test/emqx_authz_api_mnesia_SUITE.erl | 64 +++++++++---------- changes/v5.0.13-en.md | 2 + changes/v5.0.13-zh.md | 4 +- 7 files changed, 101 insertions(+), 85 deletions(-) diff --git a/.github/workflows/run_jmeter_tests.yaml b/.github/workflows/run_jmeter_tests.yaml index 6eaf4aa75..ba64b6d94 100644 --- a/.github/workflows/run_jmeter_tests.yaml +++ b/.github/workflows/run_jmeter_tests.yaml @@ -92,7 +92,7 @@ jobs: - uses: actions/checkout@v3 with: repository: emqx/emqx-fvt - ref: broker-autotest-v2 + ref: broker-autotest-v3 path: scripts - uses: actions/setup-java@v3 with: @@ -191,7 +191,7 @@ jobs: - uses: actions/checkout@v3 with: repository: emqx/emqx-fvt - ref: broker-autotest-v2 + ref: broker-autotest-v3 path: scripts - uses: actions/setup-java@v3 with: @@ -297,7 +297,7 @@ jobs: - uses: actions/checkout@v3 with: repository: emqx/emqx-fvt - ref: broker-autotest-v2 + ref: broker-autotest-v3 path: scripts - uses: actions/setup-java@v3 with: @@ -396,7 +396,7 @@ jobs: - uses: actions/checkout@v3 with: repository: emqx/emqx-fvt - ref: broker-autotest-v2 + ref: broker-autotest-v3 path: scripts - name: run jwks_server timeout-minutes: 10 @@ -496,7 +496,7 @@ jobs: - uses: actions/checkout@v3 with: repository: emqx/emqx-fvt - ref: broker-autotest-v2 + ref: broker-autotest-v3 path: scripts - uses: actions/setup-java@v3 with: diff --git a/apps/emqx_authz/i18n/emqx_authz_api_mnesia_i18n.conf b/apps/emqx_authz/i18n/emqx_authz_api_mnesia_i18n.conf index 50f644097..6d318d02b 100644 --- a/apps/emqx_authz/i18n/emqx_authz_api_mnesia_i18n.conf +++ b/apps/emqx_authz/i18n/emqx_authz_api_mnesia_i18n.conf @@ -1,28 +1,28 @@ emqx_authz_api_mnesia { users_username_get { desc { - en: """Show the list of record for username""" + en: """Show the list of rules for users""" zh: """获取内置数据库中所有用户名类型的规则记录""" } } users_username_post { desc { - en: """Add new records for username""" + en: """Add new rule for 'username'""" zh: """添加内置数据库中用户名类型的规则记录""" } } users_clientid_get { desc { - en: """Show the list of record for clientid""" + en: """Show the list of rules for clients""" zh: """获取内置数据库中所有客户端标识符类型的规则记录""" } } users_clientid_post { desc { - en: """Add new records for clientid""" + en: """Add new rule for 'clientid'""" zh: """添加内置数据库中客户端标识符类型的规则记录""" } } @@ -30,71 +30,71 @@ emqx_authz_api_mnesia { user_username_get { desc { - en: """Get record info for username""" + en: """Get rule for 'username'""" zh: """获取内置数据库中指定用户名类型的规则记录""" } } user_username_put { desc { - en: """Set record for username""" + en: """Set rule for 'username'""" zh: """更新内置数据库中指定用户名类型的规则记录""" } } user_username_delete { desc { - en: """Delete one record for username""" + en: """Delete rule for 'username'""" zh: """删除内置数据库中指定用户名类型的规则记录""" } } user_clientid_get { desc { - en: """Get record info for clientid""" + en: """Get rule for 'clientid'""" zh: """获取内置数据库中指定客户端标识符类型的规则记录""" } } user_clientid_put { desc { - en: """Set record for clientid""" + en: """Set rule for 'clientid'""" zh: """更新内置数据库中指定客户端标识符类型的规则记录""" } } user_clientid_delete { desc { - en: """Delete one record for clientid""" + en: """Delete rule for 'clientid'""" zh: """删除内置数据库中指定客户端标识符类型的规则记录""" } } - - rules_for_all_get { + rules_all_get { desc { - en: """Show the list of rules for all""" + en: """Show the list of rules for 'all'""" zh: """列出为所有客户端启用的规则列表""" } } - rules_for_all_post { + rules_all_post { desc { - en: """ -Create/Update the list of rules for all. -Set a empty list to clean up rules -""" - zh: """ -创建/更新 为所有客户端启用的规则列表。 -设为空列表以清楚所有规则 -""" + en: """Create/Update the list of rules for 'all'.""" + zh: """创建/更新 为所有客户端启用的规则列表。""" } } - purge_all_delete { + rules_all_delete { desc { - en: """Purge all records for username/clientid/all""" - zh: """清除所有内置数据库中的规则, 用户名/客户端标识符/所有""" + en: """Delete rules for 'all'""" + zh: """删除 `all` 规则""" + } + } + + rules_delete { + desc { + en: """Delete all rules for all 'users', 'clients' and 'all'""" + zh: """清除内置数据库中的所有类型('users' 、'clients' 、'all')的所有规则""" } } diff --git a/apps/emqx_authz/src/emqx_authz.app.src b/apps/emqx_authz/src/emqx_authz.app.src index c876fbf16..f5b9f9da6 100644 --- a/apps/emqx_authz/src/emqx_authz.app.src +++ b/apps/emqx_authz/src/emqx_authz.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_authz, [ {description, "An OTP application"}, - {vsn, "0.1.10"}, + {vsn, "0.1.11"}, {registered, []}, {mod, {emqx_authz_app, []}}, {applications, [ diff --git a/apps/emqx_authz/src/emqx_authz_api_mnesia.erl b/apps/emqx_authz/src/emqx_authz_api_mnesia.erl index 6a747496c..b39379b43 100644 --- a/apps/emqx_authz/src/emqx_authz_api_mnesia.erl +++ b/apps/emqx_authz/src/emqx_authz_api_mnesia.erl @@ -44,7 +44,7 @@ user/2, client/2, all/2, - purge/2 + rules/2 ]). %% query funs @@ -70,19 +70,19 @@ api_spec() -> paths() -> [ - "/authorization/sources/built_in_database/username", - "/authorization/sources/built_in_database/clientid", - "/authorization/sources/built_in_database/username/:username", - "/authorization/sources/built_in_database/clientid/:clientid", - "/authorization/sources/built_in_database/all", - "/authorization/sources/built_in_database/purge-all" + "/authorization/sources/built_in_database/rules/users", + "/authorization/sources/built_in_database/rules/clients", + "/authorization/sources/built_in_database/rules/users/:username", + "/authorization/sources/built_in_database/rules/clients/:clientid", + "/authorization/sources/built_in_database/rules/all", + "/authorization/sources/built_in_database/rules" ]. %%-------------------------------------------------------------------- %% Schema for each URI %%-------------------------------------------------------------------- -schema("/authorization/sources/built_in_database/username") -> +schema("/authorization/sources/built_in_database/rules/users") -> #{ 'operationId' => users, get => @@ -128,7 +128,7 @@ schema("/authorization/sources/built_in_database/username") -> } } }; -schema("/authorization/sources/built_in_database/clientid") -> +schema("/authorization/sources/built_in_database/rules/clients") -> #{ 'operationId' => clients, get => @@ -174,7 +174,7 @@ schema("/authorization/sources/built_in_database/clientid") -> } } }; -schema("/authorization/sources/built_in_database/username/:username") -> +schema("/authorization/sources/built_in_database/rules/users/:username") -> #{ 'operationId' => user, get => @@ -227,7 +227,7 @@ schema("/authorization/sources/built_in_database/username/:username") -> } } }; -schema("/authorization/sources/built_in_database/clientid/:clientid") -> +schema("/authorization/sources/built_in_database/rules/clients/:clientid") -> #{ 'operationId' => client, get => @@ -280,20 +280,20 @@ schema("/authorization/sources/built_in_database/clientid/:clientid") -> } } }; -schema("/authorization/sources/built_in_database/all") -> +schema("/authorization/sources/built_in_database/rules/all") -> #{ 'operationId' => all, get => #{ tags => [<<"authorization">>], - description => ?DESC(rules_for_all_get), + description => ?DESC(rules_all_get), responses => #{200 => swagger_with_example({rules, ?TYPE_REF}, {all, ?PUT_MAP_EXAMPLE})} }, post => #{ tags => [<<"authorization">>], - description => ?DESC(rules_for_all_post), + description => ?DESC(rules_all_post), 'requestBody' => swagger_with_example({rules, ?TYPE_REF}, {all, ?PUT_MAP_EXAMPLE}), responses => @@ -303,15 +303,24 @@ schema("/authorization/sources/built_in_database/all") -> [?BAD_REQUEST], <<"Bad rule schema">> ) } - } - }; -schema("/authorization/sources/built_in_database/purge-all") -> - #{ - 'operationId' => purge, + }, delete => #{ tags => [<<"authorization">>], - description => ?DESC(purge_all_delete), + description => ?DESC(rules_all_delete), + responses => + #{ + 204 => <<"Deleted">> + } + } + }; +schema("/authorization/sources/built_in_database/rules") -> + #{ + 'operationId' => rules, + delete => + #{ + tags => [<<"authorization">>], + description => ?DESC(rules_delete), responses => #{ 204 => <<"Deleted">>, @@ -555,9 +564,12 @@ all(get, _) -> end; all(post, #{body := #{<<"rules">> := Rules}}) -> emqx_authz_mnesia:store_rules(all, format_rules(Rules)), + {204}; +all(delete, _) -> + emqx_authz_mnesia:store_rules(all, []), {204}. -purge(delete, _) -> +rules(delete, _) -> case emqx_authz_api_sources:get_raw_source(<<"built_in_database">>) of [#{<<"enable">> := false}] -> ok = emqx_authz_mnesia:purge_rules(), diff --git a/apps/emqx_authz/test/emqx_authz_api_mnesia_SUITE.erl b/apps/emqx_authz/test/emqx_authz_api_mnesia_SUITE.erl index 62c747433..7b91f9b1c 100644 --- a/apps/emqx_authz/test/emqx_authz_api_mnesia_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_api_mnesia_SUITE.erl @@ -70,21 +70,21 @@ t_api(_) -> {ok, 204, _} = request( post, - uri(["authorization", "sources", "built_in_database", "username"]), + uri(["authorization", "sources", "built_in_database", "rules", "users"]), [?USERNAME_RULES_EXAMPLE] ), {ok, 409, _} = request( post, - uri(["authorization", "sources", "built_in_database", "username"]), + uri(["authorization", "sources", "built_in_database", "rules", "users"]), [?USERNAME_RULES_EXAMPLE] ), {ok, 200, Request1} = request( get, - uri(["authorization", "sources", "built_in_database", "username"]), + uri(["authorization", "sources", "built_in_database", "rules", "users"]), [] ), #{ @@ -104,7 +104,8 @@ t_api(_) -> "authorization", "sources", "built_in_database", - "username?page=1&limit=20&like_username=noexist" + "rules", + "users?page=1&limit=20&like_username=noexist" ]), [] ), @@ -120,7 +121,7 @@ t_api(_) -> {ok, 200, Request2} = request( get, - uri(["authorization", "sources", "built_in_database", "username", "user1"]), + uri(["authorization", "sources", "built_in_database", "rules", "users", "user1"]), [] ), #{<<"username">> := <<"user1">>, <<"rules">> := Rules1} = jsx:decode(Request2), @@ -128,13 +129,13 @@ t_api(_) -> {ok, 204, _} = request( put, - uri(["authorization", "sources", "built_in_database", "username", "user1"]), + uri(["authorization", "sources", "built_in_database", "rules", "users", "user1"]), ?USERNAME_RULES_EXAMPLE#{rules => []} ), {ok, 200, Request3} = request( get, - uri(["authorization", "sources", "built_in_database", "username", "user1"]), + uri(["authorization", "sources", "built_in_database", "rules", "users", "user1"]), [] ), #{<<"username">> := <<"user1">>, <<"rules">> := Rules2} = jsx:decode(Request3), @@ -143,46 +144,46 @@ t_api(_) -> {ok, 204, _} = request( delete, - uri(["authorization", "sources", "built_in_database", "username", "user1"]), + uri(["authorization", "sources", "built_in_database", "rules", "users", "user1"]), [] ), {ok, 404, _} = request( get, - uri(["authorization", "sources", "built_in_database", "username", "user1"]), + uri(["authorization", "sources", "built_in_database", "rules", "users", "user1"]), [] ), {ok, 404, _} = request( delete, - uri(["authorization", "sources", "built_in_database", "username", "user1"]), + uri(["authorization", "sources", "built_in_database", "rules", "users", "user1"]), [] ), {ok, 204, _} = request( post, - uri(["authorization", "sources", "built_in_database", "clientid"]), + uri(["authorization", "sources", "built_in_database", "rules", "clients"]), [?CLIENTID_RULES_EXAMPLE] ), {ok, 409, _} = request( post, - uri(["authorization", "sources", "built_in_database", "clientid"]), + uri(["authorization", "sources", "built_in_database", "rules", "clients"]), [?CLIENTID_RULES_EXAMPLE] ), {ok, 200, Request4} = request( get, - uri(["authorization", "sources", "built_in_database", "clientid"]), + uri(["authorization", "sources", "built_in_database", "rules", "clients"]), [] ), {ok, 200, Request5} = request( get, - uri(["authorization", "sources", "built_in_database", "clientid", "client1"]), + uri(["authorization", "sources", "built_in_database", "rules", "clients", "client1"]), [] ), #{ @@ -196,13 +197,13 @@ t_api(_) -> {ok, 204, _} = request( put, - uri(["authorization", "sources", "built_in_database", "clientid", "client1"]), + uri(["authorization", "sources", "built_in_database", "rules", "clients", "client1"]), ?CLIENTID_RULES_EXAMPLE#{rules => []} ), {ok, 200, Request6} = request( get, - uri(["authorization", "sources", "built_in_database", "clientid", "client1"]), + uri(["authorization", "sources", "built_in_database", "rules", "clients", "client1"]), [] ), #{<<"clientid">> := <<"client1">>, <<"rules">> := Rules4} = jsx:decode(Request6), @@ -211,32 +212,32 @@ t_api(_) -> {ok, 204, _} = request( delete, - uri(["authorization", "sources", "built_in_database", "clientid", "client1"]), + uri(["authorization", "sources", "built_in_database", "rules", "clients", "client1"]), [] ), {ok, 404, _} = request( get, - uri(["authorization", "sources", "built_in_database", "clientid", "client1"]), + uri(["authorization", "sources", "built_in_database", "rules", "clients", "client1"]), [] ), {ok, 404, _} = request( delete, - uri(["authorization", "sources", "built_in_database", "clientid", "client1"]), + uri(["authorization", "sources", "built_in_database", "rules", "clients", "client1"]), [] ), {ok, 204, _} = request( post, - uri(["authorization", "sources", "built_in_database", "all"]), + uri(["authorization", "sources", "built_in_database", "rules", "all"]), ?ALL_RULES_EXAMPLE ), {ok, 200, Request7} = request( get, - uri(["authorization", "sources", "built_in_database", "all"]), + uri(["authorization", "sources", "built_in_database", "rules", "all"]), [] ), #{<<"rules">> := Rules5} = jsx:decode(Request7), @@ -244,15 +245,14 @@ t_api(_) -> {ok, 204, _} = request( - post, - uri(["authorization", "sources", "built_in_database", "all"]), - - ?ALL_RULES_EXAMPLE#{rules => []} + delete, + uri(["authorization", "sources", "built_in_database", "rules", "all"]), + [] ), {ok, 200, Request8} = request( get, - uri(["authorization", "sources", "built_in_database", "all"]), + uri(["authorization", "sources", "built_in_database", "rules", "all"]), [] ), #{<<"rules">> := Rules6} = jsx:decode(Request8), @@ -261,7 +261,7 @@ t_api(_) -> {ok, 204, _} = request( post, - uri(["authorization", "sources", "built_in_database", "username"]), + uri(["authorization", "sources", "built_in_database", "rules", "users"]), [ #{username => erlang:integer_to_binary(N), rules => []} || N <- lists:seq(1, 20) @@ -270,7 +270,7 @@ t_api(_) -> {ok, 200, Request9} = request( get, - uri(["authorization", "sources", "built_in_database", "username?page=2&limit=5"]), + uri(["authorization", "sources", "built_in_database", "rules", "users?page=2&limit=5"]), [] ), #{<<"data">> := Data1} = jsx:decode(Request9), @@ -279,7 +279,7 @@ t_api(_) -> {ok, 204, _} = request( post, - uri(["authorization", "sources", "built_in_database", "clientid"]), + uri(["authorization", "sources", "built_in_database", "rules", "clients"]), [ #{clientid => erlang:integer_to_binary(N), rules => []} || N <- lists:seq(1, 20) @@ -288,7 +288,7 @@ t_api(_) -> {ok, 200, Request10} = request( get, - uri(["authorization", "sources", "built_in_database", "clientid?limit=5"]), + uri(["authorization", "sources", "built_in_database", "rules", "clients?limit=5"]), [] ), #{<<"data">> := Data2} = jsx:decode(Request10), @@ -297,7 +297,7 @@ t_api(_) -> {ok, 400, Msg1} = request( delete, - uri(["authorization", "sources", "built_in_database", "purge-all"]), + uri(["authorization", "sources", "built_in_database", "rules"]), [] ), ?assertMatch({match, _}, re:run(Msg1, "must\sbe\sdisabled\sbefore")), @@ -323,7 +323,7 @@ t_api(_) -> {ok, 204, _} = request( delete, - uri(["authorization", "sources", "built_in_database", "purge-all"]), + uri(["authorization", "sources", "built_in_database", "rules"]), [] ), ?assertEqual(0, emqx_authz_mnesia:record_count()), diff --git a/changes/v5.0.13-en.md b/changes/v5.0.13-en.md index 669275958..11c7705a4 100644 --- a/changes/v5.0.13-en.md +++ b/changes/v5.0.13-en.md @@ -20,6 +20,8 @@ - `/bridges_probe` API endpoint to test params for creating a new data bridge [#9585](https://github.com/emqx/emqx/pull/9585). +- Refactor `/authorization/sources/built_in_database/` by adding `rules/` to the path [#9569](https://github.com/emqx/emqx/pull/9569). + ## Bug fixes - Trigger `message.dropped` hook when QoS2 message is resend by client with a same packet id, or 'awaiting_rel' queue is full [#9487](https://github.com/emqx/emqx/pull/9487). diff --git a/changes/v5.0.13-zh.md b/changes/v5.0.13-zh.md index aae629a4a..8b556a6d8 100644 --- a/changes/v5.0.13-zh.md +++ b/changes/v5.0.13-zh.md @@ -18,7 +18,9 @@ - 确保黑名单的默认超期时间足够长 [#9599](https://github.com/emqx/emqx/pull/9599/)。 -- [FIXME] `/bridges_probe` API 端点用于测试创建新数据桥的参数 [#9585](https://github.com/emqx/emqx/pull/9585)。 +- 添加新 API 接口 `/bridges_probe` 用于测试创建桥接的参数是否可用 [#9585](https://github.com/emqx/emqx/pull/9585)。 + +- 重构 `/authorization/sources/built_in_database/` 接口,将 `rules/` 添加到了其路径中 [#9569](https://github.com/emqx/emqx/pull/9569)。 ## 修复 From 8ad82881959c97623ff152206645ed44339ea2c7 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Mon, 9 Jan 2023 17:11:06 +0100 Subject: [PATCH 07/22] feat: report error in create_dry_run --- apps/emqx_bridge/src/emqx_bridge_api.erl | 45 +---------------- .../test/emqx_bridge_api_SUITE.erl | 49 +++++++++++++------ .../src/emqx_resource_manager.erl | 11 +++-- 3 files changed, 43 insertions(+), 62 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 851089acb..6de7d8695 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -485,7 +485,7 @@ schema("/bridges_probe") -> RequestMeta = #{module => ?MODULE, method => post, path => "/bridges_probe"}, case emqx_dashboard_swagger:filter_check_request_and_translate_body(Request, RequestMeta) of {ok, #{body := #{<<"type">> := ConnType} = Params}} -> - case do_probe(ConnType, maps:remove(<<"type">>, Params)) of + case emqx_bridge_resource:create_dry_run(ConnType, maps:remove(<<"type">>, Params)) of ok -> {204}; {error, Error} -> @@ -495,49 +495,6 @@ schema("/bridges_probe") -> BadRequest end. -do_probe(ConnType, Params) -> - case test_connection(host_and_port(ConnType, Params)) of - ok -> - emqx_bridge_resource:create_dry_run(ConnType, Params); - Error -> - Error - end. - -host_and_port(mqtt, #{<<"server">> := Server}) -> - case string:split(Server, ":") of - [Host, Port] -> {Host, list_to_integer(Port)}; - _Other -> error(invalid_server, Server) - end; -host_and_port(webhook, #{<<"url">> := Url}) -> - {BaseUrl, _Path} = parse_url(Url), - {ok, #{host := Host, port := Port}} = emqx_http_lib:uri_parse(BaseUrl), - {Host, Port}; -host_and_port(_Unknown, _) -> - undefined. - -%% [TODO] remove in EMQX-8588 when resource manager handles things more elegantly -test_connection(undefined) -> - %% be friendly, it might fail later on with a 'timeout' error. - ok; -test_connection({Host, Port}) -> - case gen_tcp:connect(Host, Port, [], 5000) of - {ok, TestSocket} -> gen_tcp:close(TestSocket); - Error -> Error - end. - -parse_url(Url) -> - case string:split(Url, "//", leading) of - [Scheme, UrlRem] -> - case string:split(UrlRem, "/", leading) of - [HostPort, Path] -> - {iolist_to_binary([Scheme, "//", HostPort]), Path}; - [HostPort] -> - {iolist_to_binary([Scheme, "//", HostPort]), <<>>} - end; - [Url] -> - error({invalid_url, Url}) - end. - lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) -> Nodes = mria_mnesia:running_nodes(), case is_ok(emqx_bridge_proto_v1:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index a32019e41..a77da7544 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -33,13 +33,21 @@ ) ) ). --define(HTTP_BRIDGE(URL, TYPE, NAME), #{ +-define(BRIDGE(NAME, TYPE), #{ + <<"ssl">> => #{<<"enable">> => false}, <<"type">> => TYPE, - <<"name">> => NAME, + <<"name">> => NAME +}). +-define(MQTT_BRIDGE(SERVER), ?BRIDGE(<<"mqtt_egress_test_bridge">>, <<"mqtt">>)#{ + <<"server">> => SERVER, + <<"username">> => <<"user1">>, + <<"password">> => <<"">>, + <<"proto_ver">> => <<"v5">> +}). +-define(HTTP_BRIDGE(URL, TYPE, NAME), ?BRIDGE(NAME, TYPE)#{ <<"url">> => URL, <<"local_topic">> => <<"emqx_webhook/#">>, <<"method">> => <<"post">>, - <<"ssl">> => #{<<"enable">> => false}, <<"body">> => <<"${payload}">>, <<"headers">> => #{ <<"content-type">> => <<"application/json">> @@ -596,16 +604,6 @@ t_with_redact_update(_Config) -> ?assertEqual(Password, Value), ok. --define(MQTT_BRIDGE(Server), #{ - <<"server">> => Server, - <<"username">> => <<"user1">>, - <<"password">> => <<"">>, - <<"proto_ver">> => <<"v5">>, - <<"ssl">> => #{<<"enable">> => false}, - <<"type">> => <<"mqtt">>, - <<"name">> => <<"mqtt_egress_test_bridge">> -}). - t_bridges_probe(Config) -> Port = ?config(port, Config), URL = ?URL(Port, "some_path"), @@ -623,11 +621,18 @@ t_bridges_probe(Config) -> ?HTTP_BRIDGE(URL, ?BRIDGE_TYPE, ?BRIDGE_NAME) ), - {ok, 400, _} = request( + {ok, 400, NxDomain} = request( post, uri(["bridges_probe"]), ?HTTP_BRIDGE(<<"http://203.0.113.3:1234/foo">>, ?BRIDGE_TYPE, ?BRIDGE_NAME) ), + ?assertMatch( + #{ + <<"code">> := <<"TEST_FAILED">>, + <<"message">> := _ + }, + jsx:decode(NxDomain) + ), {ok, 204, _} = request( post, @@ -635,11 +640,25 @@ t_bridges_probe(Config) -> ?MQTT_BRIDGE(<<"127.0.0.1:1883">>) ), - {ok, 400, _} = request( + {ok, 400, ConnRefused} = request( post, uri(["bridges_probe"]), ?MQTT_BRIDGE(<<"127.0.0.1:2883">>) ), + ?assertMatch( + #{ + <<"code">> := <<"TEST_FAILED">>, + <<"message">> := <<"#{reason => econnrefused", _/binary>> + }, + jsx:decode(ConnRefused) + ), + + {ok, 400, BadReq} = request( + post, + uri(["bridges_probe"]), + ?BRIDGE(<<"bad_bridge">>, <<"unknown_type">>) + ), + ?assertMatch(#{<<"code">> := <<"BAD_REQUEST">>}, jsx:decode(BadReq)), ok. request(Method, Url, Body) -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 8ad3fdd80..821bcbc5c 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -174,6 +174,9 @@ create_dry_run(ResourceType, Config) -> case wait_for_ready(ResId, 15000) of ok -> remove(ResId); + {error, Reason} -> + _ = remove(ResId), + {error, Reason}; timeout -> _ = remove(ResId), {error, timeout} @@ -632,16 +635,18 @@ data_record_to_external_map_with_metrics(Data) -> metrics => get_metrics(Data#data.id) }. --spec wait_for_ready(resource_id(), integer()) -> ok | timeout. +-spec wait_for_ready(resource_id(), integer()) -> ok | timeout | {error, term()}. wait_for_ready(ResId, WaitTime) -> do_wait_for_ready(ResId, WaitTime div ?WAIT_FOR_RESOURCE_DELAY). do_wait_for_ready(_ResId, 0) -> timeout; do_wait_for_ready(ResId, Retry) -> - case ets_lookup(ResId) of - {ok, _Group, #{status := connected}} -> + case read_cache(ResId) of + {_Group, #data{status = connected}} -> ok; + {_Group, #data{status = disconnected, error = Reason}} -> + {error, Reason}; _ -> timer:sleep(?WAIT_FOR_RESOURCE_DELAY), do_wait_for_ready(ResId, Retry - 1) From 77f043dedff31017cbf9ef46e98776b1f9228f58 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Wed, 11 Jan 2023 14:43:52 +0100 Subject: [PATCH 08/22] fix: don't require username and password not enforced by schema so we shouldn't either --- .../src/emqx_connector_mysql.erl | 37 ++++++++++++++----- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 6c0ff7210..693917a27 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -66,10 +66,21 @@ roots() -> fields(config) -> [{server, server()}] ++ - emqx_connector_schema_lib:relational_db_fields() ++ + add_default_username(emqx_connector_schema_lib:relational_db_fields(), []) ++ emqx_connector_schema_lib:ssl_fields() ++ emqx_connector_schema_lib:prepare_statement_fields(). +add_default_username([{username, OrigUsernameFn} | Tail], Head) -> + Head ++ [{username, add_default_fn(OrigUsernameFn, <<"root">>)} | Tail]; +add_default_username([Field | Tail], Head) -> + add_default_username(Tail, Head ++ [Field]). + +add_default_fn(OrigFn, Default) -> + fun + (default) -> Default; + (Field) -> OrigFn(Field) + end. + server() -> Meta = #{desc => ?DESC("server")}, emqx_schema:servers_sc(Meta, ?MYSQL_HOST_OPTIONS). @@ -83,8 +94,7 @@ on_start( #{ server := Server, database := DB, - username := User, - password := Password, + username := Username, auto_reconnect := AutoReconn, pool_size := PoolSize, ssl := SSL @@ -104,13 +114,15 @@ on_start( [] end, Options = [ - {host, Host}, - {port, Port}, - {user, User}, - {password, Password}, - {database, DB}, - {auto_reconnect, reconn_interval(AutoReconn)}, - {pool_size, PoolSize} + maybe_password_opt(maps:get(password, Config, undefined)) + | [ + {host, Host}, + {port, Port}, + {user, Username}, + {database, DB}, + {auto_reconnect, reconn_interval(AutoReconn)}, + {pool_size, PoolSize} + ] ], PoolName = emqx_plugin_libs_pool:pool_name(InstId), Prepares = parse_prepare_sql(Config), @@ -126,6 +138,11 @@ on_start( {error, Reason} end. +maybe_password_opt(undefined) -> + []; +maybe_password_opt(Password) -> + {password, Password}. + on_stop(InstId, #{poolname := PoolName}) -> ?SLOG(info, #{ msg => "stopping_mysql_connector", From 04f46f5227836bb451852d36d54f7bf832904a47 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Wed, 11 Jan 2023 15:11:41 +0100 Subject: [PATCH 09/22] feat: make http connector report errors --- .../src/emqx_connector_http.erl | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 18a246edb..55e6d6f8e 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -384,14 +384,15 @@ on_query_async( on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) -> case do_get_status(PoolName, Timeout) of - true -> - connected; - false -> + ok -> + {connected, State}; + {error, Reason} -> ?SLOG(error, #{ msg => "http_connector_get_status_failed", + reason => Reason, state => State }), - disconnected + {disconnected, State, Reason} end. do_get_status(PoolName, Timeout) -> @@ -400,24 +401,28 @@ do_get_status(PoolName, Timeout) -> fun(Worker) -> case ehttpc:health_check(Worker, Timeout) of ok -> - true; - {error, Reason} -> + ok; + {error, Reason} = Error -> ?SLOG(error, #{ msg => "ehttpc_health_check_failed", reason => Reason, worker => Worker }), - false + Error end end, try emqx_misc:pmap(DoPerWorker, Workers, Timeout) of - [_ | _] = Status -> - lists:all(fun(St) -> St =:= true end, Status); - [] -> - false + % we crash in case of non-empty lists since we don't know what to do in that case + [_ | _] = Results -> + case [E || {error, _} = E <- Results] of + [] -> + ok; + Errors -> + hd(Errors) + end catch exit:timeout -> - false + {error, timeout} end. %%-------------------------------------------------------------------- From 2a81fa152263381cffaa52d7bbe93b3212e8b22a Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Thu, 12 Jan 2023 14:29:11 +0100 Subject: [PATCH 10/22] fix: remove redundant log message --- apps/emqx_connector/src/emqx_connector_http.erl | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 55e6d6f8e..286e0e4e6 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -387,11 +387,6 @@ on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = St ok -> {connected, State}; {error, Reason} -> - ?SLOG(error, #{ - msg => "http_connector_get_status_failed", - reason => Reason, - state => State - }), {disconnected, State, Reason} end. @@ -404,7 +399,7 @@ do_get_status(PoolName, Timeout) -> ok; {error, Reason} = Error -> ?SLOG(error, #{ - msg => "ehttpc_health_check_failed", + msg => "http_connector_get_status_failed", reason => Reason, worker => Worker }), From c2fd1a4482a30307fa986384d3d611bfcdeb03d8 Mon Sep 17 00:00:00 2001 From: Erik Timan Date: Thu, 12 Jan 2023 10:10:52 +0100 Subject: [PATCH 11/22] feat(emqx_bridge): shorten operation endpoint URLs This shortens and simplifies URLs for performing bridge operations so that the API looks more congruent. --- apps/emqx_bridge/src/emqx_bridge_api.erl | 20 +++++++++---------- .../test/emqx_bridge_api_SUITE.erl | 4 ++-- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index cf39ebf14..a71142bc5 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -36,8 +36,8 @@ -export([ '/bridges'/2, '/bridges/:id'/2, - '/bridges/:id/operation/:operation'/2, - '/nodes/:node/bridges/:id/operation/:operation'/2, + '/bridges/:id/:operation'/2, + '/nodes/:node/bridges/:id/:operation'/2, '/bridges/:id/reset_metrics'/2 ]). @@ -66,8 +66,8 @@ paths() -> [ "/bridges", "/bridges/:id", - "/bridges/:id/operation/:operation", - "/nodes/:node/bridges/:id/operation/:operation", + "/bridges/:id/:operation", + "/nodes/:node/bridges/:id/:operation", "/bridges/:id/reset_metrics" ]. @@ -348,9 +348,9 @@ schema("/bridges/:id/reset_metrics") -> } } }; -schema("/bridges/:id/operation/:operation") -> +schema("/bridges/:id/:operation") -> #{ - 'operationId' => '/bridges/:id/operation/:operation', + 'operationId' => '/bridges/:id/:operation', post => #{ tags => [<<"bridges">>], summary => <<"Enable/Disable/Stop/Restart Bridge">>, @@ -366,9 +366,9 @@ schema("/bridges/:id/operation/:operation") -> } } }; -schema("/nodes/:node/bridges/:id/operation/:operation") -> +schema("/nodes/:node/bridges/:id/:operation") -> #{ - 'operationId' => '/nodes/:node/bridges/:id/operation/:operation', + 'operationId' => '/nodes/:node/bridges/:id/:operation', post => #{ tags => [<<"bridges">>], summary => <<"Stop/Restart Bridge">>, @@ -485,7 +485,7 @@ lookup_from_local_node(BridgeType, BridgeName) -> Error -> Error end. -'/bridges/:id/operation/:operation'(post, #{ +'/bridges/:id/:operation'(post, #{ bindings := #{id := Id, operation := Op} }) -> @@ -513,7 +513,7 @@ lookup_from_local_node(BridgeType, BridgeName) -> end ). -'/nodes/:node/bridges/:id/operation/:operation'(post, #{ +'/nodes/:node/bridges/:id/:operation'(post, #{ bindings := #{id := Id, operation := Op, node := Node} }) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 4d16f1692..a6b5ece89 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -600,9 +600,9 @@ t_with_redact_update(_Config) -> ok. operation_path(node, Oper, BridgeID) -> - uri(["nodes", node(), "bridges", BridgeID, "operation", Oper]); + uri(["nodes", node(), "bridges", BridgeID, Oper]); operation_path(cluster, Oper, BridgeID) -> - uri(["bridges", BridgeID, "operation", Oper]). + uri(["bridges", BridgeID, Oper]). str(S) when is_list(S) -> S; str(S) when is_binary(S) -> binary_to_list(S). From 860e21d40f1b32eb4e3ac03d93679fbac6dd9e2b Mon Sep 17 00:00:00 2001 From: Erik Timan Date: Thu, 12 Jan 2023 10:39:19 +0100 Subject: [PATCH 12/22] feat(emqx_bridge): move metrics to own endpoint, rename reset_metrics In order for the /bridges APIs to be consistent with other APIs, we move out metrics from GET /bridges/{id} to its own endpoint, /bridges/{id}/metrics. We also rename /bridges/reset_metrics to /bridges/metrics/reset. --- apps/emqx_bridge/i18n/emqx_bridge_api.conf | 10 ++ apps/emqx_bridge/src/emqx_bridge_api.erl | 78 +++++++++++---- .../src/schema/emqx_bridge_mqtt_schema.erl | 2 +- .../src/schema/emqx_bridge_schema.erl | 21 ++-- .../src/schema/emqx_bridge_webhook_schema.erl | 2 +- .../test/emqx_bridge_api_SUITE.erl | 96 ++++++++++++++++--- .../src/emqx_ee_bridge_gcp_pubsub.erl | 2 +- .../src/emqx_ee_bridge_hstreamdb.erl | 2 +- .../src/emqx_ee_bridge_influxdb.erl | 2 +- .../src/emqx_ee_bridge_kafka.erl | 2 +- .../src/emqx_ee_bridge_mongodb.erl | 6 +- .../src/emqx_ee_bridge_mysql.erl | 2 +- .../src/emqx_ee_bridge_pgsql.erl | 2 +- .../src/emqx_ee_bridge_redis.erl | 2 +- 14 files changed, 176 insertions(+), 53 deletions(-) diff --git a/apps/emqx_bridge/i18n/emqx_bridge_api.conf b/apps/emqx_bridge/i18n/emqx_bridge_api.conf index e8bb2403a..796cbb91c 100644 --- a/apps/emqx_bridge/i18n/emqx_bridge_api.conf +++ b/apps/emqx_bridge/i18n/emqx_bridge_api.conf @@ -134,4 +134,14 @@ NOTE:不允许在单节点上启用/禁用 Bridge""" } } + desc_bridge_metrics { + desc { + en: """Get bridge metrics by Id""" + zh: """""" + } + label: { + en: "Get Bridge Metrics" + zh: "" + } + } } diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index a71142bc5..b05e31b11 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -38,7 +38,8 @@ '/bridges/:id'/2, '/bridges/:id/:operation'/2, '/nodes/:node/bridges/:id/:operation'/2, - '/bridges/:id/reset_metrics'/2 + '/bridges/:id/metrics'/2, + '/bridges/:id/metrics/reset'/2 ]). -export([lookup_from_local_node/2]). @@ -68,7 +69,8 @@ paths() -> "/bridges/:id", "/bridges/:id/:operation", "/nodes/:node/bridges/:id/:operation", - "/bridges/:id/reset_metrics" + "/bridges/:id/metrics", + "/bridges/:id/metrics/reset" ]. error_schema(Code, Message) when is_atom(Code) -> @@ -132,19 +134,22 @@ param_path_id() -> } )}. -bridge_info_array_example(Method) -> - [Config || #{value := Config} <- maps:values(bridge_info_examples(Method))]. +bridge_info_array_example(Method, WithMetrics) -> + [Config || #{value := Config} <- maps:values(bridge_info_examples(Method, WithMetrics))]. bridge_info_examples(Method) -> + bridge_info_examples(Method, false). + +bridge_info_examples(Method, WithMetrics) -> maps:merge( #{ <<"webhook_example">> => #{ summary => <<"WebHook">>, - value => info_example(webhook, Method) + value => info_example(webhook, Method, WithMetrics) }, <<"mqtt_example">> => #{ summary => <<"MQTT Bridge">>, - value => info_example(mqtt, Method) + value => info_example(mqtt, Method, WithMetrics) } }, ee_bridge_examples(Method) @@ -157,24 +162,24 @@ ee_bridge_examples(Method) -> _:_ -> #{} end. -info_example(Type, Method) -> +info_example(Type, Method, WithMetrics) -> maps:merge( info_example_basic(Type), - method_example(Type, Method) + method_example(Type, Method, WithMetrics) ). -method_example(Type, Method) when Method == get; Method == post -> +method_example(Type, Method, WithMetrics) when Method == get; Method == post -> SType = atom_to_list(Type), SName = SType ++ "_example", TypeNameExam = #{ type => bin(SType), name => bin(SName) }, - maybe_with_metrics_example(TypeNameExam, Method); -method_example(_Type, put) -> + maybe_with_metrics_example(TypeNameExam, Method, WithMetrics); +method_example(_Type, put, _WithMetrics) -> #{}. -maybe_with_metrics_example(TypeNameExam, get) -> +maybe_with_metrics_example(TypeNameExam, get, true) -> TypeNameExam#{ metrics => ?EMPTY_METRICS, node_metrics => [ @@ -184,7 +189,7 @@ maybe_with_metrics_example(TypeNameExam, get) -> } ] }; -maybe_with_metrics_example(TypeNameExam, _) -> +maybe_with_metrics_example(TypeNameExam, _, _) -> TypeNameExam. info_example_basic(webhook) -> @@ -274,7 +279,7 @@ schema("/bridges") -> responses => #{ 200 => emqx_dashboard_swagger:schema_with_example( array(emqx_bridge_schema:get_response()), - bridge_info_array_example(get) + bridge_info_array_example(get, true) ) } }, @@ -334,9 +339,23 @@ schema("/bridges/:id") -> } } }; -schema("/bridges/:id/reset_metrics") -> +schema("/bridges/:id/metrics") -> #{ - 'operationId' => '/bridges/:id/reset_metrics', + 'operationId' => '/bridges/:id/metrics', + get => #{ + tags => [<<"bridges">>], + summary => <<"Get Bridge Metrics">>, + description => ?DESC("desc_bridge_metrics"), + parameters => [param_path_id()], + responses => #{ + 200 => emqx_bridge_schema:metrics_fields(), + 404 => error_schema('NOT_FOUND', "Bridge not found") + } + } + }; +schema("/bridges/:id/metrics/reset") -> + #{ + 'operationId' => '/bridges/:id/metrics/reset', put => #{ tags => [<<"bridges">>], summary => <<"Reset Bridge Metrics">>, @@ -455,7 +474,10 @@ schema("/nodes/:node/bridges/:id/:operation") -> end ). -'/bridges/:id/reset_metrics'(put, #{bindings := #{id := Id}}) -> +'/bridges/:id/metrics'(get, #{bindings := #{id := Id}}) -> + ?TRY_PARSE_ID(Id, lookup_from_all_nodes_metrics(BridgeType, BridgeName, 200)). + +'/bridges/:id/metrics/reset'(put, #{bindings := #{id := Id}}) -> ?TRY_PARSE_ID( Id, case @@ -469,10 +491,18 @@ schema("/nodes/:node/bridges/:id/:operation") -> ). lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) -> + FormatFun = fun format_bridge_info_without_metrics/1, + do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun). + +lookup_from_all_nodes_metrics(BridgeType, BridgeName, SuccCode) -> + FormatFun = fun format_bridge_metrics/1, + do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun). + +do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun) -> Nodes = mria_mnesia:running_nodes(), case is_ok(emqx_bridge_proto_v1:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of {ok, [{ok, _} | _] = Results} -> - {SuccCode, format_bridge_info([R || {ok, R} <- Results])}; + {SuccCode, FormatFun([R || {ok, R} <- Results])}; {ok, [{error, not_found} | _]} -> {404, error_msg('NOT_FOUND', <<"not_found">>)}; {error, ErrL} -> @@ -572,7 +602,7 @@ zip_bridges([BridgesFirstNode | _] = BridgesAllNodes) -> lists:foldl( fun(#{type := Type, name := Name}, Acc) -> Bridges = pick_bridges_by_id(Type, Name, BridgesAllNodes), - [format_bridge_info(Bridges) | Acc] + [format_bridge_info_with_metrics(Bridges) | Acc] end, [], BridgesFirstNode @@ -606,7 +636,7 @@ pick_bridges_by_id(Type, Name, BridgesAllNodes) -> BridgesAllNodes ). -format_bridge_info([FirstBridge | _] = Bridges) -> +format_bridge_info_with_metrics([FirstBridge | _] = Bridges) -> Res = maps:remove(node, FirstBridge), NodeStatus = collect_status(Bridges), NodeMetrics = collect_metrics(Bridges), @@ -617,6 +647,14 @@ format_bridge_info([FirstBridge | _] = Bridges) -> node_metrics => NodeMetrics }). +format_bridge_info_without_metrics(Bridges) -> + Res = format_bridge_info_with_metrics(Bridges), + maps:without([metrics, node_metrics], Res). + +format_bridge_metrics(Bridges) -> + Res = format_bridge_info_with_metrics(Bridges), + maps:with([metrics, node_metrics], Res). + collect_status(Bridges) -> [maps:with([node, status], B) || B <- Bridges]. diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl index 4665a3bc5..5cd1693c7 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl @@ -51,7 +51,7 @@ fields("post") -> fields("put") -> fields("config"); fields("get") -> - emqx_bridge_schema:metrics_status_fields() ++ fields("config"). + emqx_bridge_schema:status_fields() ++ fields("config"). desc("config") -> ?DESC("config"); diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl index 845c1ef90..09a99488e 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl @@ -30,7 +30,8 @@ -export([ common_bridge_fields/0, - metrics_status_fields/0 + status_fields/0, + metrics_fields/0 ]). %%====================================================================================== @@ -83,19 +84,23 @@ common_bridge_fields() -> )} ]. -metrics_status_fields() -> +status_fields() -> + [ + {"status", mk(status(), #{desc => ?DESC("desc_status")})}, + {"node_status", + mk( + hoconsc:array(ref(?MODULE, "node_status")), + #{desc => ?DESC("desc_node_status")} + )} + ]. + +metrics_fields() -> [ {"metrics", mk(ref(?MODULE, "metrics"), #{desc => ?DESC("desc_metrics")})}, {"node_metrics", mk( hoconsc:array(ref(?MODULE, "node_metrics")), #{desc => ?DESC("desc_node_metrics")} - )}, - {"status", mk(status(), #{desc => ?DESC("desc_status")})}, - {"node_status", - mk( - hoconsc:array(ref(?MODULE, "node_status")), - #{desc => ?DESC("desc_node_status")} )} ]. diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl index 0495911e7..b495436a4 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl @@ -38,7 +38,7 @@ fields("post") -> fields("put") -> fields("config"); fields("get") -> - emqx_bridge_schema:metrics_status_fields() ++ fields("post"); + emqx_bridge_schema:status_fields() ++ fields("post"); fields("creation_opts") -> lists:filter( fun({K, _V}) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index a6b5ece89..eb86b923f 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -187,8 +187,6 @@ t_http_crud_apis(Config) -> <<"enable">> := true, <<"status">> := _, <<"node_status">> := [_ | _], - <<"metrics">> := _, - <<"node_metrics">> := [_ | _], <<"url">> := URL1 } = jsx:decode(Bridge), @@ -225,8 +223,6 @@ t_http_crud_apis(Config) -> <<"enable">> := true, <<"status">> := _, <<"node_status">> := [_ | _], - <<"metrics">> := _, - <<"node_metrics">> := [_ | _], <<"url">> := URL2 }, jsx:decode(Bridge2) @@ -259,8 +255,6 @@ t_http_crud_apis(Config) -> <<"enable">> := true, <<"status">> := _, <<"node_status">> := [_ | _], - <<"metrics">> := _, - <<"node_metrics">> := [_ | _], <<"url">> := URL2 }, jsx:decode(Bridge3Str) @@ -456,8 +450,6 @@ do_start_stop_bridges(Type, Config) -> <<"enable">> := true, <<"status">> := <<"connected">>, <<"node_status">> := [_ | _], - <<"metrics">> := _, - <<"node_metrics">> := [_ | _], <<"url">> := URL1 } = jsx:decode(Bridge), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), @@ -502,8 +494,6 @@ t_enable_disable_bridges(Config) -> <<"enable">> := true, <<"status">> := <<"connected">>, <<"node_status">> := [_ | _], - <<"metrics">> := _, - <<"node_metrics">> := [_ | _], <<"url">> := URL1 } = jsx:decode(Bridge), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), @@ -555,12 +545,10 @@ t_reset_bridges(Config) -> <<"enable">> := true, <<"status">> := <<"connected">>, <<"node_status">> := [_ | _], - <<"metrics">> := _, - <<"node_metrics">> := [_ | _], <<"url">> := URL1 } = jsx:decode(Bridge), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), - {ok, 200, <<"Reset success">>} = request(put, uri(["bridges", BridgeID, "reset_metrics"]), []), + {ok, 200, <<"Reset success">>} = request(put, uri(["bridges", BridgeID, "metrics/reset"]), []), %% delete the bridge {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), @@ -599,6 +587,88 @@ t_with_redact_update(_Config) -> ?assertEqual(Password, Value), ok. +t_metrics(Config) -> + Port = ?config(port, Config), + %% assert we there's no bridges at first + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + + %% then we add a webhook bridge, using POST + %% POST /bridges/ will create a bridge + URL1 = ?URL(Port, "path1"), + Name = ?BRIDGE_NAME, + {ok, 201, Bridge} = request( + post, + uri(["bridges"]), + ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name) + ), + + %ct:pal("---bridge: ~p", [Bridge]), + #{ + <<"type">> := ?BRIDGE_TYPE, + <<"name">> := Name, + <<"enable">> := true, + <<"status">> := _, + <<"node_status">> := [_ | _], + <<"url">> := URL1 + } = jsx:decode(Bridge), + + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), + + %% check for empty bridge metrics + {ok, 200, Bridge1Str} = request(get, uri(["bridges", BridgeID, "metrics"]), []), + ct:pal("HERE ~p", [jsx:decode(Bridge1Str)]), + ?assertMatch( + #{ + <<"metrics">> := #{<<"success">> := 0}, + <<"node_metrics">> := [_ | _] + }, + jsx:decode(Bridge1Str) + ), + + %% send an message to emqx and the message should be forwarded to the HTTP server + Body = <<"my msg">>, + emqx:publish(emqx_message:make(<<"emqx_webhook/1">>, Body)), + ?assert( + receive + {http_server, received, #{ + method := <<"POST">>, + path := <<"/path1">>, + body := Body + }} -> + true; + Msg -> + ct:pal("error: http got unexpected request: ~p", [Msg]), + false + after 100 -> + false + end + ), + + %% check for non-empty bridge metrics + {ok, 200, Bridge2Str} = request(get, uri(["bridges", BridgeID, "metrics"]), []), + ct:pal("HERE ~p", [jsx:decode(Bridge2Str)]), + ?assertMatch( + #{ + <<"metrics">> := #{<<"success">> := 1}, + <<"node_metrics">> := [_ | _] + }, + jsx:decode(Bridge2Str) + ), + + %% check for non-empty metrics when listing all bridges + {ok, 200, BridgesStr} = request(get, uri(["bridges"]), []), + ct:pal("HERE ~p", [jsx:decode(BridgesStr)]), + ?assertMatch( + [ + #{ + <<"metrics">> := #{<<"success">> := 1}, + <<"node_metrics">> := [_ | _] + } + ], + jsx:decode(BridgesStr) + ), + ok. + operation_path(node, Oper, BridgeID) -> uri(["nodes", node(), "bridges", BridgeID, Oper]); operation_path(cluster, Oper, BridgeID) -> diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl index 760aba9e1..83fe31b49 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl @@ -124,7 +124,7 @@ fields(bridge_config) -> )} ]; fields("get") -> - emqx_bridge_schema:metrics_status_fields() ++ fields("post"); + emqx_bridge_schema:status_fields() ++ fields("post"); fields("post") -> [type_field(), name_field() | fields("config")]; fields("put") -> diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstreamdb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstreamdb.erl index 135087929..6e0c711b2 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstreamdb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstreamdb.erl @@ -67,7 +67,7 @@ fields("post") -> fields("put") -> fields("config"); fields("get") -> - emqx_bridge_schema:metrics_status_fields() ++ fields("post"). + emqx_bridge_schema:status_fields() ++ fields("post"). field(connector) -> mk( diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl index 62c8b6ab7..fece72e82 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl @@ -139,7 +139,7 @@ method_fileds(get, ConnectorType) -> influxdb_bridge_common_fields() ++ connector_fields(ConnectorType) ++ type_name_fields(ConnectorType) ++ - emqx_bridge_schema:metrics_status_fields(); + emqx_bridge_schema:status_fields(); method_fileds(put, ConnectorType) -> influxdb_bridge_common_fields() ++ connector_fields(ConnectorType). diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl index 3f2f6a85f..9fae4f30a 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl @@ -67,7 +67,7 @@ fields("post") -> fields("put") -> fields("config"); fields("get") -> - emqx_bridge_schema:metrics_status_fields() ++ fields("post"); + emqx_bridge_schema:status_fields() ++ fields("post"); fields("config") -> [ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl index bb4082681..eaf2b7da1 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl @@ -59,15 +59,15 @@ fields("put_sharded") -> fields("put_single") -> fields(mongodb_single); fields("get_rs") -> - emqx_bridge_schema:metrics_status_fields() ++ + emqx_bridge_schema:status_fields() ++ fields(mongodb_rs) ++ type_and_name_fields(mongodb_rs); fields("get_sharded") -> - emqx_bridge_schema:metrics_status_fields() ++ + emqx_bridge_schema:status_fields() ++ fields(mongodb_sharded) ++ type_and_name_fields(mongodb_sharded); fields("get_single") -> - emqx_bridge_schema:metrics_status_fields() ++ + emqx_bridge_schema:status_fields() ++ fields(mongodb_single) ++ type_and_name_fields(mongodb_single). diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl index 114459149..71f8a8399 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl @@ -104,7 +104,7 @@ fields("post") -> fields("put") -> fields("config"); fields("get") -> - emqx_bridge_schema:metrics_status_fields() ++ fields("post"). + emqx_bridge_schema:status_fields() ++ fields("post"). desc("config") -> ?DESC("desc_config"); diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl index be9fc9dc8..7e21d4dd7 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl @@ -106,7 +106,7 @@ fields("post") -> fields("put") -> fields("config"); fields("get") -> - emqx_bridge_schema:metrics_status_fields() ++ fields("post"). + emqx_bridge_schema:status_fields() ++ fields("post"). fields("post", Type) -> [type_field(Type), name_field() | fields("config")]. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl index 5c273e050..3a3963786 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl @@ -126,7 +126,7 @@ method_fileds(get, ConnectorType) -> redis_bridge_common_fields() ++ connector_fields(ConnectorType) ++ type_name_fields(ConnectorType) ++ - emqx_bridge_schema:metrics_status_fields(); + emqx_bridge_schema:status_fields(); method_fileds(put, ConnectorType) -> redis_bridge_common_fields() ++ connector_fields(ConnectorType). From 42f42de4d9fad1666d1b76d758623d3f4dbc2edf Mon Sep 17 00:00:00 2001 From: Erik Timan Date: Thu, 12 Jan 2023 19:32:38 +0100 Subject: [PATCH 13/22] feat(emqx_bridge): add separate endpoint for enable/disable of bridge In order to improve the consistency with other API endpoints, we move the enable/disable operations to a separate endpoint /bridges/{id}/enable/[true,false]. --- apps/emqx_bridge/i18n/emqx_bridge_api.conf | 35 +++++++-- apps/emqx_bridge/src/emqx_bridge_api.erl | 75 ++++++++++++++----- .../test/emqx_bridge_api_SUITE.erl | 13 ++-- 3 files changed, 93 insertions(+), 30 deletions(-) diff --git a/apps/emqx_bridge/i18n/emqx_bridge_api.conf b/apps/emqx_bridge/i18n/emqx_bridge_api.conf index 796cbb91c..8adda9355 100644 --- a/apps/emqx_bridge/i18n/emqx_bridge_api.conf +++ b/apps/emqx_bridge/i18n/emqx_bridge_api.conf @@ -2,8 +2,8 @@ emqx_bridge_api { desc_param_path_operation_cluster { desc { - en: """Operations can be one of: enable, disable, start, stop, restart""" - zh: """集群可用操作:启用、禁用、启动、停止、重新启动""" + en: """Operations can be one of: start, stop, restart""" + zh: """""" } label: { en: "Cluster Operation" @@ -44,6 +44,16 @@ emqx_bridge_api { } } + desc_param_path_enable { + desc { + en: """Whether or not the bridge is enabled""" + zh: """""" + } + label: { + en: "Enable bridge" + zh: "" + } + } desc_api1 { desc { en: """List all created bridges""" @@ -112,8 +122,8 @@ emqx_bridge_api { desc_api7 { desc { - en: """Enable/Disable/Stop/Restart bridges on all nodes in the cluster.""" - zh: """在集群中的所有节点上启用/禁用/停止/重新启动 Bridge。""" + en: """Stop/Restart bridges on all nodes in the cluster.""" + zh: """""" } label: { en: "Cluster Bridge Operate" @@ -123,10 +133,8 @@ emqx_bridge_api { desc_api8 { desc { - en: """Stop/Restart bridges on a specific node. - NOTE: It's not allowed to disable/enable bridges on a single node.""" - zh: """在某个节点上停止/重新启动 Bridge。 -NOTE:不允许在单节点上启用/禁用 Bridge""" + en: """Stop/Restart bridges on a specific node.""" + zh: """在某个节点上停止/重新启动 Bridge。""" } label: { en: "Node Bridge Operate" @@ -144,4 +152,15 @@ NOTE:不允许在单节点上启用/禁用 Bridge""" zh: "" } } + + desc_enable_bridge { + desc { + en: """Enable or Disable bridges on all nodes in the cluster.""" + zh: """""" + } + label: { + en: "Cluster Bridge Enable" + zh: "" + } + } } diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index b05e31b11..f3247206e 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -36,6 +36,7 @@ -export([ '/bridges'/2, '/bridges/:id'/2, + '/bridges/:id/enable/:enable'/2, '/bridges/:id/:operation'/2, '/nodes/:node/bridges/:id/:operation'/2, '/bridges/:id/metrics'/2, @@ -67,6 +68,7 @@ paths() -> [ "/bridges", "/bridges/:id", + "/bridges/:id/enable/:enable", "/bridges/:id/:operation", "/nodes/:node/bridges/:id/:operation", "/bridges/:id/metrics", @@ -89,7 +91,7 @@ get_response_body_schema() -> param_path_operation_cluster() -> {operation, mk( - enum([enable, disable, stop, restart]), + enum([stop, restart]), #{ in => path, required => true, @@ -134,6 +136,17 @@ param_path_id() -> } )}. +param_path_enable() -> + {enable, + mk( + boolean(), + #{ + in => path, + desc => ?DESC("desc_param_path_enable"), + example => true + } + )}. + bridge_info_array_example(Method, WithMetrics) -> [Config || #{value := Config} <- maps:values(bridge_info_examples(Method, WithMetrics))]. @@ -367,12 +380,29 @@ schema("/bridges/:id/metrics/reset") -> } } }; +schema("/bridges/:id/enable/:enable") -> + #{ + 'operationId' => '/bridges/:id/enable/:enable', + put => + #{ + tags => [<<"bridges">>], + summary => <<"Enable or Disable Bridge">>, + desc => ?DESC("desc_enable_bridge"), + parameters => [param_path_id(), param_path_enable()], + responses => + #{ + 204 => <<"Success">>, + 400 => error_schema('INVALID_ID', "Bad bridge ID"), + 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") + } + } + }; schema("/bridges/:id/:operation") -> #{ 'operationId' => '/bridges/:id/:operation', post => #{ tags => [<<"bridges">>], - summary => <<"Enable/Disable/Stop/Restart Bridge">>, + summary => <<"Stop or Restart Bridge">>, description => ?DESC("desc_api7"), parameters => [ param_path_id(), @@ -515,6 +545,28 @@ lookup_from_local_node(BridgeType, BridgeName) -> Error -> Error end. +'/bridges/:id/enable/:enable'(put, #{bindings := #{id := Id, enable := Enable}}) -> + ?TRY_PARSE_ID( + Id, + case enable_func(Enable) of + invalid -> + {400, error_msg('BAD_REQUEST', <<"invalid operation">>)}; + OperFunc -> + case emqx_bridge:disable_enable(OperFunc, BridgeType, BridgeName) of + {ok, _} -> + {204}; + {error, {pre_config_update, _, bridge_not_found}} -> + {404, error_msg('NOT_FOUND', <<"bridge not found">>)}; + {error, {_, _, timeout}} -> + {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; + {error, timeout} -> + {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; + {error, Reason} -> + {500, error_msg('INTERNAL_ERROR', Reason)} + end + end + ). + '/bridges/:id/:operation'(post, #{ bindings := #{id := Id, operation := Op} @@ -524,19 +576,6 @@ lookup_from_local_node(BridgeType, BridgeName) -> case operation_func(Op) of invalid -> {400, error_msg('BAD_REQUEST', <<"invalid operation">>)}; - OperFunc when OperFunc == enable; OperFunc == disable -> - case emqx_bridge:disable_enable(OperFunc, BridgeType, BridgeName) of - {ok, _} -> - {200}; - {error, {pre_config_update, _, bridge_not_found}} -> - {404, error_msg('NOT_FOUND', <<"bridge not found">>)}; - {error, {_, _, timeout}} -> - {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; - {error, timeout} -> - {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; - {error, Reason} -> - {500, error_msg('INTERNAL_ERROR', Reason)} - end; OperFunc -> Nodes = mria_mnesia:running_nodes(), operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) @@ -573,10 +612,12 @@ node_operation_func(_) -> invalid. operation_func(<<"stop">>) -> stop; operation_func(<<"restart">>) -> restart; -operation_func(<<"enable">>) -> enable; -operation_func(<<"disable">>) -> disable; operation_func(_) -> invalid. +enable_func(<<"true">>) -> enable; +enable_func(<<"false">>) -> disable; +enable_func(_) -> invalid. + operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) -> RpcFunc = case OperFunc of diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index eb86b923f..4650ea1ad 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -498,19 +498,19 @@ t_enable_disable_bridges(Config) -> } = jsx:decode(Bridge), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), %% disable it - {ok, 200, <<>>} = request(post, operation_path(cluster, disable, BridgeID), <<"">>), + {ok, 204, <<>>} = request(put, enable_path(false, BridgeID), <<"">>), {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []), ?assertMatch(#{<<"status">> := <<"stopped">>}, jsx:decode(Bridge2)), %% enable again - {ok, 200, <<>>} = request(post, operation_path(cluster, enable, BridgeID), <<"">>), + {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), <<"">>), {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)), %% enable an already started bridge - {ok, 200, <<>>} = request(post, operation_path(cluster, enable, BridgeID), <<"">>), + {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), <<"">>), {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)), %% disable it again - {ok, 200, <<>>} = request(post, operation_path(cluster, disable, BridgeID), <<"">>), + {ok, 204, <<>>} = request(put, enable_path(false, BridgeID), <<"">>), {ok, 403, Res} = request(post, operation_path(node, restart, BridgeID), <<"">>), ?assertEqual( @@ -519,7 +519,7 @@ t_enable_disable_bridges(Config) -> ), %% enable a stopped bridge - {ok, 200, <<>>} = request(post, operation_path(cluster, enable, BridgeID), <<"">>), + {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), <<"">>), {ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []), ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge4)), %% delete the bridge @@ -674,5 +674,8 @@ operation_path(node, Oper, BridgeID) -> operation_path(cluster, Oper, BridgeID) -> uri(["bridges", BridgeID, Oper]). +enable_path(Enable, BridgeID) -> + uri(["bridges", BridgeID, "enable", Enable]). + str(S) when is_list(S) -> S; str(S) when is_binary(S) -> binary_to_list(S). From a9844b33031abf65108f79c7ff11d22bb198e59f Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Fri, 13 Jan 2023 10:04:29 +0100 Subject: [PATCH 14/22] chore: move changelog entries to v5.0.15 --- changes/v5.0.13-en.md | 4 ---- changes/v5.0.13-zh.md | 4 ---- changes/v5.0.15/feat-9569.en.md | 1 + changes/v5.0.15/feat-9569.zh.md | 1 + changes/v5.0.15/feat-9585.en.md | 1 + changes/v5.0.15/feat-9585.zh.md | 1 + 6 files changed, 4 insertions(+), 8 deletions(-) create mode 100644 changes/v5.0.15/feat-9569.en.md create mode 100644 changes/v5.0.15/feat-9569.zh.md create mode 100644 changes/v5.0.15/feat-9585.en.md create mode 100644 changes/v5.0.15/feat-9585.zh.md diff --git a/changes/v5.0.13-en.md b/changes/v5.0.13-en.md index 11c7705a4..1ed3c798b 100644 --- a/changes/v5.0.13-en.md +++ b/changes/v5.0.13-en.md @@ -18,10 +18,6 @@ - Ensure the default expiration time of `banned` is large enough [#9599](https://github.com/emqx/emqx/pull/9599/). -- `/bridges_probe` API endpoint to test params for creating a new data bridge [#9585](https://github.com/emqx/emqx/pull/9585). - -- Refactor `/authorization/sources/built_in_database/` by adding `rules/` to the path [#9569](https://github.com/emqx/emqx/pull/9569). - ## Bug fixes - Trigger `message.dropped` hook when QoS2 message is resend by client with a same packet id, or 'awaiting_rel' queue is full [#9487](https://github.com/emqx/emqx/pull/9487). diff --git a/changes/v5.0.13-zh.md b/changes/v5.0.13-zh.md index 8b556a6d8..41a1228f4 100644 --- a/changes/v5.0.13-zh.md +++ b/changes/v5.0.13-zh.md @@ -18,10 +18,6 @@ - 确保黑名单的默认超期时间足够长 [#9599](https://github.com/emqx/emqx/pull/9599/)。 -- 添加新 API 接口 `/bridges_probe` 用于测试创建桥接的参数是否可用 [#9585](https://github.com/emqx/emqx/pull/9585)。 - -- 重构 `/authorization/sources/built_in_database/` 接口,将 `rules/` 添加到了其路径中 [#9569](https://github.com/emqx/emqx/pull/9569)。 - ## 修复 - 当 QoS2 消息被重发(使用相同 Packet ID),或当 'awaiting_rel' 队列已满时,触发消息丢弃钩子(`message.dropped`)及计数器 [#9487](https://github.com/emqx/emqx/pull/9487)。 diff --git a/changes/v5.0.15/feat-9569.en.md b/changes/v5.0.15/feat-9569.en.md new file mode 100644 index 000000000..f3b70ec41 --- /dev/null +++ b/changes/v5.0.15/feat-9569.en.md @@ -0,0 +1 @@ +Refactor `/authorization/sources/built_in_database/` by adding `rules/` to the path. diff --git a/changes/v5.0.15/feat-9569.zh.md b/changes/v5.0.15/feat-9569.zh.md new file mode 100644 index 000000000..dd2e19c11 --- /dev/null +++ b/changes/v5.0.15/feat-9569.zh.md @@ -0,0 +1 @@ +重构 `/authorization/sources/built_in_database/` 接口,将 `rules/` 添加到了其路径中。 diff --git a/changes/v5.0.15/feat-9585.en.md b/changes/v5.0.15/feat-9585.en.md new file mode 100644 index 000000000..986cbb0c3 --- /dev/null +++ b/changes/v5.0.15/feat-9585.en.md @@ -0,0 +1 @@ +`/bridges_probe` API endpoint to test params for creating a new data bridge. diff --git a/changes/v5.0.15/feat-9585.zh.md b/changes/v5.0.15/feat-9585.zh.md new file mode 100644 index 000000000..82dd307ae --- /dev/null +++ b/changes/v5.0.15/feat-9585.zh.md @@ -0,0 +1 @@ +添加新 API 接口 `/bridges_probe` 用于测试创建桥接的参数是否可用。 From 0fd6865c41c8d66d5184c238eebf945b339b2c4f Mon Sep 17 00:00:00 2001 From: Erik Timan Date: Thu, 12 Jan 2023 19:51:25 +0100 Subject: [PATCH 15/22] chore: add changes --- changes/v5.0.15/feat-9736.en.md | 5 +++++ changes/v5.0.15/feat-9736.zh.md | 5 +++++ 2 files changed, 10 insertions(+) create mode 100644 changes/v5.0.15/feat-9736.en.md create mode 100644 changes/v5.0.15/feat-9736.zh.md diff --git a/changes/v5.0.15/feat-9736.en.md b/changes/v5.0.15/feat-9736.en.md new file mode 100644 index 000000000..59d7bd558 --- /dev/null +++ b/changes/v5.0.15/feat-9736.en.md @@ -0,0 +1,5 @@ +Refactor of /bridges API to make it more consistent with other APIs: +- bridge enable/disable is now done via the endpoint `/bridges/{id}/enable/[true,false]` +- `/bridges/{id}/operation/{operation}` endpoints are now `/bridges/{id}/{operation}` +- metrics are moved out from the GET `/bridges/{id}` response and can now be fetched via `/bridges/{id}/metrics` +- the `bridges/{id}/reset_metrics` endpoint is now `/bridges/{id}/metrics/reset` diff --git a/changes/v5.0.15/feat-9736.zh.md b/changes/v5.0.15/feat-9736.zh.md new file mode 100644 index 000000000..0107c8ab6 --- /dev/null +++ b/changes/v5.0.15/feat-9736.zh.md @@ -0,0 +1,5 @@ +重构部分 /bridges 的API 使得其和其他 API 能够更加一致: +- 桥接的启用和禁用现在是通过 `/bridges/{id}/enable/[true,false]` API 来实现的 +- 使用 `/bridges/{id}/{operation}` 替换了旧的 `/bridges/{id}/operation/{operation}` API +- 指标数据从 `/bridges/{id}` 的响应消息中移除,现在可以使用新的 API `/bridges/{id}/metrics` 进行访问 +- 使用 `/bridges/{id}/metrics/reset` 替换了旧的 `bridges/{id}/reset_metrics` API From f1c58c34ed124f597fdfbcf6d314233ba34a2262 Mon Sep 17 00:00:00 2001 From: Erik Timan Date: Fri, 13 Jan 2023 09:57:33 +0100 Subject: [PATCH 16/22] test(emqx_bridge): fix fetching of metrics in emqx_bridge_mqtt_SUITE --- .../test/emqx_bridge_mqtt_SUITE.erl | 38 ++++++++++--------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index ae4fc4692..3040789b3 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -187,7 +187,7 @@ t_mqtt_conn_bridge_ingress(_) -> ), %% verify the metrics of the bridge - {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDIngress]), []), + {ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDIngress, "metrics"]), []), ?assertMatch( #{ <<"metrics">> := #{<<"matched">> := 0, <<"received">> := 1}, @@ -200,7 +200,7 @@ t_mqtt_conn_bridge_ingress(_) -> } ] }, - jsx:decode(BridgeStr) + jsx:decode(BridgeMetricsStr) ), %% delete the bridge @@ -255,7 +255,7 @@ t_mqtt_conn_bridge_egress(_) -> ), %% verify the metrics of the bridge - {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), + {ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []), ?assertMatch( #{ <<"metrics">> := #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0}, @@ -268,7 +268,7 @@ t_mqtt_conn_bridge_egress(_) -> } ] }, - jsx:decode(BridgeStr) + jsx:decode(BridgeMetricsStr) ), %% delete the bridge @@ -354,7 +354,7 @@ t_mqtt_conn_bridge_ingress_and_egress(_) -> Payload = <<"hello">>, emqx:subscribe(RemoteTopic), - {ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []), + {ok, 200, BridgeMetricsStr1} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []), #{ <<"metrics">> := #{ <<"matched">> := CntMatched1, <<"success">> := CntSuccess1, <<"failed">> := 0 @@ -371,7 +371,7 @@ t_mqtt_conn_bridge_ingress_and_egress(_) -> } } ] - } = jsx:decode(BridgeStr1), + } = jsx:decode(BridgeMetricsStr1), timer:sleep(100), %% PUBLISH a message to the 'local' broker, as we have only one broker, %% the remote broker is also the local one. @@ -393,7 +393,7 @@ t_mqtt_conn_bridge_ingress_and_egress(_) -> %% verify the metrics of the bridge timer:sleep(1000), - {ok, 200, BridgeStr2} = request(get, uri(["bridges", BridgeIDEgress]), []), + {ok, 200, BridgeMetricsStr2} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []), #{ <<"metrics">> := #{ <<"matched">> := CntMatched2, <<"success">> := CntSuccess2, <<"failed">> := 0 @@ -410,7 +410,7 @@ t_mqtt_conn_bridge_ingress_and_egress(_) -> } } ] - } = jsx:decode(BridgeStr2), + } = jsx:decode(BridgeMetricsStr2), ?assertEqual(CntMatched2, CntMatched1 + 1), ?assertEqual(CntSuccess2, CntSuccess1 + 1), ?assertEqual(NodeCntMatched2, NodeCntMatched1 + 1), @@ -513,7 +513,7 @@ t_ingress_mqtt_bridge_with_rules(_) -> ), %% verify the metrics of the bridge - {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDIngress]), []), + {ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDIngress, "metrics"]), []), ?assertMatch( #{ <<"metrics">> := #{<<"matched">> := 0, <<"received">> := 1}, @@ -526,7 +526,7 @@ t_ingress_mqtt_bridge_with_rules(_) -> } ] }, - jsx:decode(BridgeStr) + jsx:decode(BridgeMetricsStr) ), {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), @@ -627,7 +627,7 @@ t_egress_mqtt_bridge_with_rules(_) -> ), %% verify the metrics of the bridge - {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), + {ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []), ?assertMatch( #{ <<"metrics">> := #{<<"matched">> := 2, <<"success">> := 2, <<"failed">> := 0}, @@ -641,7 +641,7 @@ t_egress_mqtt_bridge_with_rules(_) -> } ] }, - jsx:decode(BridgeStr) + jsx:decode(BridgeMetricsStr) ), {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), @@ -693,7 +693,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> assert_mqtt_msg_received(RemoteTopic, Payload0), %% verify the metrics of the bridge - {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), + {ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []), ?assertMatch( #{ <<"metrics">> := #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0}, @@ -706,7 +706,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> } ] }, - jsx:decode(BridgeStr) + jsx:decode(BridgeMetricsStr) ), %% stop the listener 1883 to make the bridge disconnected @@ -740,7 +740,9 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> %% verify the metrics of the bridge, the message should be queued {ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []), + {ok, 200, BridgeMetricsStr1} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []), Decoded1 = jsx:decode(BridgeStr1), + DecodedMetrics1 = jsx:decode(BridgeMetricsStr1), ?assertMatch( Status when (Status == <<"connected">> orelse Status == <<"connecting">>), maps:get(<<"status">>, Decoded1) @@ -753,7 +755,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> <<"failed">> := 0, <<"queuing">> := 2 } when Matched >= 3, - maps:get(<<"metrics">>, Decoded1) + maps:get(<<"metrics">>, DecodedMetrics1) ), %% start the listener 1883 to make the bridge reconnected @@ -761,10 +763,12 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> timer:sleep(1500), %% verify the metrics of the bridge, the 2 queued messages should have been sent {ok, 200, BridgeStr2} = request(get, uri(["bridges", BridgeIDEgress]), []), + {ok, 200, BridgeMetricsStr2} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []), + Decoded2 = jsx:decode(BridgeStr2), + ?assertEqual(<<"connected">>, maps:get(<<"status">>, Decoded2)), %% matched >= 3 because of possible retries. ?assertMatch( #{ - <<"status">> := <<"connected">>, <<"metrics">> := #{ <<"matched">> := Matched, <<"success">> := 3, @@ -773,7 +777,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> <<"retried">> := _ } } when Matched >= 3, - jsx:decode(BridgeStr2) + jsx:decode(BridgeMetricsStr2) ), %% also verify the 2 messages have been sent to the remote broker assert_mqtt_msg_received(RemoteTopic, Payload1), From 8dd52e5a6e92ce46790f45ca713c4b698a414671 Mon Sep 17 00:00:00 2001 From: Erik Timan Date: Fri, 13 Jan 2023 10:16:27 +0100 Subject: [PATCH 17/22] chore: add translations to schemas --- apps/emqx_bridge/i18n/emqx_bridge_api.conf | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/apps/emqx_bridge/i18n/emqx_bridge_api.conf b/apps/emqx_bridge/i18n/emqx_bridge_api.conf index 8adda9355..9c9200aeb 100644 --- a/apps/emqx_bridge/i18n/emqx_bridge_api.conf +++ b/apps/emqx_bridge/i18n/emqx_bridge_api.conf @@ -3,7 +3,7 @@ emqx_bridge_api { desc_param_path_operation_cluster { desc { en: """Operations can be one of: start, stop, restart""" - zh: """""" + zh: """集群可用操作:启用、启动、重新启动""" } label: { en: "Cluster Operation" @@ -47,11 +47,11 @@ emqx_bridge_api { desc_param_path_enable { desc { en: """Whether or not the bridge is enabled""" - zh: """""" + zh: """是否启用桥接""" } label: { en: "Enable bridge" - zh: "" + zh: "启用桥接" } } desc_api1 { @@ -123,7 +123,7 @@ emqx_bridge_api { desc_api7 { desc { en: """Stop/Restart bridges on all nodes in the cluster.""" - zh: """""" + zh: """停止或启用所有节点上的桥接""" } label: { en: "Cluster Bridge Operate" @@ -145,22 +145,22 @@ emqx_bridge_api { desc_bridge_metrics { desc { en: """Get bridge metrics by Id""" - zh: """""" + zh: """通过 Id 来获取桥接的指标信息""" } label: { en: "Get Bridge Metrics" - zh: "" + zh: "获取桥接的指标" } } desc_enable_bridge { desc { en: """Enable or Disable bridges on all nodes in the cluster.""" - zh: """""" + zh: """启用或禁用所有节点上的桥接""" } label: { en: "Cluster Bridge Enable" - zh: "" + zh: "是否启用集群内的桥接" } } } From 7a17fb7308d80498af53926f9ee118b9ce89fa23 Mon Sep 17 00:00:00 2001 From: Erik Timan Date: Fri, 13 Jan 2023 13:50:37 +0100 Subject: [PATCH 18/22] test(emqx_ee_bridge): fix bridge enable/disable in kafka producer suite --- .../emqx_bridge_impl_kafka_producer_SUITE.erl | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index bdde21c76..8e5b1fa95 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -220,9 +220,10 @@ kafka_bridge_rest_api_helper(Config) -> BridgeIdUrlEnc = BridgeType ++ UrlEscColon ++ BridgeName, BridgesParts = ["bridges"], BridgesPartsIdDeleteAlsoActions = ["bridges", BridgeIdUrlEnc ++ "?also_delete_dep_actions"], - OpUrlFun = fun(OpName) -> ["bridges", BridgeIdUrlEnc, "operation", OpName] end, - BridgesPartsOpDisable = OpUrlFun("disable"), - BridgesPartsOpEnable = OpUrlFun("enable"), + OpUrlFun = fun(OpName) -> ["bridges", BridgeIdUrlEnc, OpName] end, + EnableFun = fun(Enable) -> ["bridges", BridgeIdUrlEnc, "enable", Enable] end, + BridgesPartsOpDisable = EnableFun("false"), + BridgesPartsOpEnable = EnableFun("true"), BridgesPartsOpRestart = OpUrlFun("restart"), BridgesPartsOpStop = OpUrlFun("stop"), %% List bridges @@ -321,10 +322,10 @@ kafka_bridge_rest_api_helper(Config) -> ?assertEqual(0, emqx_resource_metrics:retried_failed_get(ResourceId)), ?assertEqual(0, emqx_resource_metrics:retried_success_get(ResourceId)), %% Perform operations - {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})), - {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})), - {ok, 200, _} = show(http_post(show(BridgesPartsOpEnable), #{})), - {ok, 200, _} = show(http_post(show(BridgesPartsOpEnable), #{})), + {ok, 204, _} = show(http_put(show(BridgesPartsOpDisable), #{})), + {ok, 204, _} = show(http_put(show(BridgesPartsOpDisable), #{})), + {ok, 204, _} = show(http_put(show(BridgesPartsOpEnable), #{})), + {ok, 204, _} = show(http_put(show(BridgesPartsOpEnable), #{})), {ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})), {ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})), {ok, 200, _} = show(http_post(show(BridgesPartsOpRestart), #{})), From c3133fb6a22d50bf569946f7e3790b80af697f37 Mon Sep 17 00:00:00 2001 From: Erik Timan Date: Fri, 13 Jan 2023 14:49:41 +0100 Subject: [PATCH 19/22] fix(emqx_bridge): small fixes from review --- apps/emqx_bridge/i18n/emqx_bridge_api.conf | 4 ++-- apps/emqx_bridge/src/emqx_bridge_api.erl | 3 ++- apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl | 13 ++++++++----- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/apps/emqx_bridge/i18n/emqx_bridge_api.conf b/apps/emqx_bridge/i18n/emqx_bridge_api.conf index 9c9200aeb..a5593f1cf 100644 --- a/apps/emqx_bridge/i18n/emqx_bridge_api.conf +++ b/apps/emqx_bridge/i18n/emqx_bridge_api.conf @@ -2,8 +2,8 @@ emqx_bridge_api { desc_param_path_operation_cluster { desc { - en: """Operations can be one of: start, stop, restart""" - zh: """集群可用操作:启用、启动、重新启动""" + en: """Operations can be one of: stop, restart""" + zh: """集群可用操作:停止、重新启动""" } label: { en: "Cluster Operation" diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index f3247206e..3af911d3d 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -107,7 +107,7 @@ param_path_operation_on_node() -> #{ in => path, required => true, - example => <<"start">>, + example => <<"stop">>, desc => ?DESC("desc_param_path_operation_on_node") } )}. @@ -142,6 +142,7 @@ param_path_enable() -> boolean(), #{ in => path, + required => true, desc => ?DESC("desc_param_path_enable"), example => true } diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 4650ea1ad..82523a839 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -616,7 +616,6 @@ t_metrics(Config) -> %% check for empty bridge metrics {ok, 200, Bridge1Str} = request(get, uri(["bridges", BridgeID, "metrics"]), []), - ct:pal("HERE ~p", [jsx:decode(Bridge1Str)]), ?assertMatch( #{ <<"metrics">> := #{<<"success">> := 0}, @@ -625,6 +624,12 @@ t_metrics(Config) -> jsx:decode(Bridge1Str) ), + %% check that the bridge doesn't contain metrics anymore + {ok, 200, Bridge2Str} = request(get, uri(["bridges", BridgeID]), []), + Decoded = jsx:decode(Bridge2Str), + ?assertNot(maps:is_key(<<"metrics">>, Decoded)), + ?assertNot(maps:is_key(<<"node_metrics">>, Decoded)), + %% send an message to emqx and the message should be forwarded to the HTTP server Body = <<"my msg">>, emqx:publish(emqx_message:make(<<"emqx_webhook/1">>, Body)), @@ -645,19 +650,17 @@ t_metrics(Config) -> ), %% check for non-empty bridge metrics - {ok, 200, Bridge2Str} = request(get, uri(["bridges", BridgeID, "metrics"]), []), - ct:pal("HERE ~p", [jsx:decode(Bridge2Str)]), + {ok, 200, Bridge3Str} = request(get, uri(["bridges", BridgeID, "metrics"]), []), ?assertMatch( #{ <<"metrics">> := #{<<"success">> := 1}, <<"node_metrics">> := [_ | _] }, - jsx:decode(Bridge2Str) + jsx:decode(Bridge3Str) ), %% check for non-empty metrics when listing all bridges {ok, 200, BridgesStr} = request(get, uri(["bridges"]), []), - ct:pal("HERE ~p", [jsx:decode(BridgesStr)]), ?assertMatch( [ #{ From 61e98900be2841f03152ddc29416271bb77e314a Mon Sep 17 00:00:00 2001 From: Erik Timan Date: Fri, 13 Jan 2023 15:13:35 +0100 Subject: [PATCH 20/22] chore: bump app vsn of emqx_resource --- apps/emqx_resource/src/emqx_resource.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src index 00389261b..e3a37fd10 100644 --- a/apps/emqx_resource/src/emqx_resource.app.src +++ b/apps/emqx_resource/src/emqx_resource.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_resource, [ {description, "Manager for all external resources"}, - {vsn, "0.1.4"}, + {vsn, "0.1.5"}, {registered, []}, {mod, {emqx_resource_app, []}}, {applications, [ From 8f5881d1a53cdcdd1a5de26d62c3075f8d001851 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Fri, 13 Jan 2023 16:19:35 +0100 Subject: [PATCH 21/22] fix: remove stale request/3 from merge error --- apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl | 3 --- 1 file changed, 3 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 981e2f547..4116a4754 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -652,9 +652,6 @@ t_bridges_probe(Config) -> ?assertMatch(#{<<"code">> := <<"BAD_REQUEST">>}, jsx:decode(BadReq)), ok. -request(Method, Url, Body) -> - request(<<"bridge_admin">>, Method, Url, Body). - t_metrics(Config) -> Port = ?config(port, Config), %% assert we there's no bridges at first From 6fe09447ed45f2d6a7c2d7aa0ca9d51f3c6e4d34 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Fri, 13 Jan 2023 17:23:25 +0100 Subject: [PATCH 22/22] fix: stale test using old resource paths after merge --- apps/emqx_authz/test/emqx_authz_api_mnesia_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_authz/test/emqx_authz_api_mnesia_SUITE.erl b/apps/emqx_authz/test/emqx_authz_api_mnesia_SUITE.erl index 3ee3ba4d8..2aa2d9545 100644 --- a/apps/emqx_authz/test/emqx_authz_api_mnesia_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_api_mnesia_SUITE.erl @@ -168,7 +168,7 @@ t_api(_) -> {ok, 204, _} = request( post, - uri(["authorization", "sources", "built_in_database", "username"]), + uri(["authorization", "sources", "built_in_database", "rules", "users"]), [?USERNAME_RULES_EXAMPLE] ),