diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl index a6b92caaa..e56ead313 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl @@ -216,12 +216,8 @@ create_bridge_api(Config, Overrides) -> BridgeName = ?config(bridge_name, Config), BridgeConfig0 = ?config(bridge_config, Config), BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides), - ConnectorName = ?config(connector_name, Config), - ConnectorType = ?config(connector_type, Config), - ConnectorConfig = ?config(connector_config, Config), - {ok, _Connector} = - emqx_connector:create(ConnectorType, ConnectorName, ConnectorConfig), + {ok, {{_, 201, _}, _, _}} = create_connector_api(Config), Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => BridgeName}, Path = emqx_mgmt_api_test_util:api_path(["actions"]), diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_producer_action_info.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_producer_action_info.erl index 6b5391b09..c0c0b8e66 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_producer_action_info.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_producer_action_info.erl @@ -27,10 +27,14 @@ bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) -> ParamsKeys = producer_action_parameters_field_keys(), Config1 = maps:with(CommonActionKeys, BridgeV1Config), Params = maps:with(ParamsKeys, BridgeV1Config), - Config1#{ - <<"connector">> => ConnectorName, - <<"parameters">> => Params - }. + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1, + Config1#{ + <<"connector">> => ConnectorName, + <<"parameters">> => Params + } + ). %%------------------------------------------------------------------------------------------ %% Internal helper fns diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl index 61d3cec61..c02c47155 100644 --- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl @@ -107,7 +107,10 @@ fields(Field) when Field == "put_connector"; Field == "post_connector" -> - emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, fields("connection_fields")); + Fields = + fields("connection_fields") ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts), + emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields); fields("get_bridge_v2") -> emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2"); diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_action_info.erl b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_action_info.erl index 060e6a17a..02138adbe 100644 --- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_action_info.erl +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_action_info.erl @@ -30,7 +30,11 @@ bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) -> ActionParametersKeys = schema_keys(action_parameters), ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config), - ActionConfig#{<<"connector">> => ConnectorName}. + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1, + ActionConfig#{<<"connector">> => ConnectorName} + ). bridge_v1_config_to_connector_config(BridgeV1Config) -> ActionTopLevelKeys = schema_keys(mongodb_action), @@ -42,10 +46,7 @@ bridge_v1_config_to_connector_config(BridgeV1Config) -> ConnConfig0 = make_config_map(ConnectorKeys, ConnectorParametersKeys, BridgeV1Config), emqx_utils_maps:update_if_present( <<"resource_opts">>, - fun(ResourceOpts) -> - CommonROSubfields = emqx_connector_schema:common_resource_opts_subfields_bin(), - maps:with(CommonROSubfields, ResourceOpts) - end, + fun emqx_connector_schema:project_to_connector_resource_opts/1, ConnConfig0 ). diff --git a/apps/emqx_bridge_mongodb/test/emqx_bridge_mongodb_SUITE.erl b/apps/emqx_bridge_mongodb/test/emqx_bridge_mongodb_SUITE.erl index d87e1665f..b0c249217 100644 --- a/apps/emqx_bridge_mongodb/test/emqx_bridge_mongodb_SUITE.erl +++ b/apps/emqx_bridge_mongodb/test/emqx_bridge_mongodb_SUITE.erl @@ -58,10 +58,10 @@ init_per_group(Type = rs, Config) -> MongoPort = list_to_integer(os:getenv("MONGO_RS_PORT", "27017")), case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of true -> - ok = start_apps(), - emqx_mgmt_api_test_util:init_suite(), + Apps = start_apps(Config), {Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type, Config), [ + {apps, Apps}, {mongo_host, MongoHost}, {mongo_port, MongoPort}, {mongo_config, MongoConfig}, @@ -77,10 +77,10 @@ init_per_group(Type = sharded, Config) -> MongoPort = list_to_integer(os:getenv("MONGO_SHARDED_PORT", "27017")), case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of true -> - ok = start_apps(), - emqx_mgmt_api_test_util:init_suite(), + Apps = start_apps(Config), {Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type, Config), [ + {apps, Apps}, {mongo_host, MongoHost}, {mongo_port, MongoPort}, {mongo_config, MongoConfig}, @@ -96,8 +96,7 @@ init_per_group(Type = single, Config) -> MongoPort = list_to_integer(os:getenv("MONGO_SINGLE_PORT", "27017")), case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of true -> - ok = start_apps(), - emqx_mgmt_api_test_util:init_suite(), + Apps = start_apps(Config), %% NOTE: `mongo-single` has auth enabled, see `credentials.env`. AuthSource = bin(os:getenv("MONGO_AUTHSOURCE", "admin")), Username = bin(os:getenv("MONGO_USERNAME", "")), @@ -113,6 +112,7 @@ init_per_group(Type = single, Config) -> ], {Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type, NConfig), [ + {apps, Apps}, {mongo_host, MongoHost}, {mongo_port, MongoPort}, {mongo_config, MongoConfig}, @@ -124,6 +124,14 @@ init_per_group(Type = single, Config) -> {skip, no_mongo} end. +end_per_group(Type, Config) when + Type =:= rs; + Type =:= sharded; + Type =:= single +-> + Apps = ?config(apps, Config), + emqx_cth_suite:stop(Apps), + ok; end_per_group(_Type, _Config) -> ok. @@ -131,18 +139,6 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> - emqx_mgmt_api_test_util:end_suite(), - ok = emqx_common_test_helpers:stop_apps( - [ - emqx_management, - emqx_bridge_mongodb, - emqx_mongodb, - emqx_bridge, - emqx_connector, - emqx_rule_engine, - emqx_conf - ] - ), ok. init_per_testcase(_Testcase, Config) -> @@ -162,23 +158,22 @@ end_per_testcase(_Testcase, Config) -> %% Helper fns %%------------------------------------------------------------------------------ -start_apps() -> - ensure_loaded(), - %% some configs in emqx_conf app are mandatory, - %% we want to make sure they are loaded before - %% ekka start in emqx_common_test_helpers:start_apps/1 - emqx_common_test_helpers:render_and_load_app_config(emqx_conf), - ok = emqx_common_test_helpers:start_apps( +start_apps(Config) -> + Apps = emqx_cth_suite:start( [ + emqx, emqx_conf, - emqx_rule_engine, emqx_connector, emqx_bridge, - emqx_mongodb, emqx_bridge_mongodb, - emqx_management - ] - ). + emqx_rule_engine, + emqx_management, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + {ok, _Api} = emqx_common_test_http:create_default_app(), + Apps. ensure_loaded() -> _ = application:load(emqtt), @@ -221,6 +216,15 @@ mongo_config(MongoHost, MongoPort0, rs = Type, Config) -> "\n resource_opts = {" "\n query_mode = ~s" "\n worker_pool_size = 1" + "\n health_check_interval = 15s" + "\n start_timeout = 5s" + "\n start_after_created = true" + "\n request_ttl = 45s" + "\n inflight_window = 100" + "\n max_buffer_bytes = 256MB" + "\n buffer_mode = memory_only" + "\n metrics_flush_interval = 5s" + "\n resume_interval = 15s" "\n }" "\n }", [ @@ -248,6 +252,15 @@ mongo_config(MongoHost, MongoPort0, sharded = Type, Config) -> "\n resource_opts = {" "\n query_mode = ~s" "\n worker_pool_size = 1" + "\n health_check_interval = 15s" + "\n start_timeout = 5s" + "\n start_after_created = true" + "\n request_ttl = 45s" + "\n inflight_window = 100" + "\n max_buffer_bytes = 256MB" + "\n buffer_mode = memory_only" + "\n metrics_flush_interval = 5s" + "\n resume_interval = 15s" "\n }" "\n }", [ @@ -278,6 +291,15 @@ mongo_config(MongoHost, MongoPort0, single = Type, Config) -> "\n resource_opts = {" "\n query_mode = ~s" "\n worker_pool_size = 1" + "\n health_check_interval = 15s" + "\n start_timeout = 5s" + "\n start_after_created = true" + "\n request_ttl = 45s" + "\n inflight_window = 100" + "\n max_buffer_bytes = 256MB" + "\n buffer_mode = memory_only" + "\n metrics_flush_interval = 5s" + "\n resume_interval = 15s" "\n }" "\n }", [ diff --git a/apps/emqx_bridge_mongodb/test/emqx_bridge_v2_mongodb_SUITE.erl b/apps/emqx_bridge_mongodb/test/emqx_bridge_v2_mongodb_SUITE.erl index 9fd13c50b..879b1d375 100644 --- a/apps/emqx_bridge_mongodb/test/emqx_bridge_v2_mongodb_SUITE.erl +++ b/apps/emqx_bridge_mongodb/test/emqx_bridge_v2_mongodb_SUITE.erl @@ -144,7 +144,12 @@ connector_config(Name, Config) -> <<"srv_record">> => false, <<"username">> => Username, <<"password">> => iolist_to_binary(["file://", PassFile]), - <<"auth_source">> => AuthSource + <<"auth_source">> => AuthSource, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"15s">>, + <<"start_after_created">> => true, + <<"start_timeout">> => <<"5s">> + } }, InnerConfigMap = serde_roundtrip(InnerConfigMap0), parse_and_check_connector_config(InnerConfigMap, Name). @@ -166,8 +171,21 @@ bridge_config(Name, ConnectorId) -> <<"connector">> => ConnectorId, <<"parameters">> => #{}, - <<"local_topic">> => <<"t/aeh">> - %%, + <<"local_topic">> => <<"t/mongo">>, + <<"resource_opts">> => #{ + <<"batch_size">> => 1, + <<"batch_time">> => <<"0ms">>, + <<"buffer_mode">> => <<"memory_only">>, + <<"buffer_seg_bytes">> => <<"10MB">>, + <<"health_check_interval">> => <<"15s">>, + <<"inflight_window">> => 100, + <<"max_buffer_bytes">> => <<"256MB">>, + <<"metrics_flush_interval">> => <<"1s">>, + <<"query_mode">> => <<"sync">>, + <<"request_ttl">> => <<"45s">>, + <<"resume_interval">> => <<"15s">>, + <<"worker_pool_size">> => <<"1">> + } }, InnerConfigMap = serde_roundtrip(InnerConfigMap0), parse_and_check_bridge_config(InnerConfigMap, Name). diff --git a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql_action_info.erl b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql_action_info.erl index 31817b02f..99553c674 100644 --- a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql_action_info.erl +++ b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql_action_info.erl @@ -44,15 +44,17 @@ bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) -> ActionParametersKeys = schema_keys(action_parameters), ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config), - ActionConfig#{<<"connector">> => ConnectorName}. + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1, + ActionConfig#{<<"connector">> => ConnectorName} + ). bridge_v1_config_to_connector_config(BridgeV1Config) -> ConnectorKeys = schema_keys("config_connector"), - ResourceOptsKeys = schema_keys(connector_resource_opts), - maps:update_with( + emqx_utils_maps:update_if_present( <<"resource_opts">>, - fun(ResourceOpts) -> maps:with(ResourceOptsKeys, ResourceOpts) end, - #{}, + fun emqx_connector_schema:project_to_connector_resource_opts/1, maps:with(ConnectorKeys, BridgeV1Config) ). diff --git a/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl b/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl index 65413817d..51cdc573f 100644 --- a/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl +++ b/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl @@ -182,6 +182,15 @@ mysql_config(BridgeType, Config) -> " batch_size = ~b\n" " query_mode = ~s\n" " worker_pool_size = ~b\n" + " health_check_interval = 15s\n" + " start_timeout = 5s\n" + " inflight_window = 100\n" + " max_buffer_bytes = 256MB\n" + " buffer_mode = memory_only\n" + " batch_time = 0\n" + " metrics_flush_interval = 5s\n" + " buffer_seg_bytes = 10MB\n" + " start_after_created = true\n" " }\n" " ssl = {\n" " enable = ~w\n" diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl index e72e73e34..0cefd6af4 100644 --- a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl @@ -100,11 +100,18 @@ init_per_group(timescale, Config0) -> init_per_group(_Group, Config) -> Config. -end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch -> - connect_and_drop_table(Config), +end_per_group(Group, Config) when + Group =:= with_batch; + Group =:= without_batch; + Group =:= matrix; + Group =:= timescale +-> + Apps = ?config(apps, Config), ProxyHost = ?config(proxy_host, Config), ProxyPort = ?config(proxy_port, Config), + connect_and_drop_table(Config), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + ok = emqx_cth_suite:stop(Apps), ok; end_per_group(_Group, _Config) -> ok. @@ -113,8 +120,6 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> - emqx_mgmt_api_test_util:end_suite(), - ok = emqx_common_test_helpers:stop_apps([emqx, emqx_postgresql, emqx_conf, emqx_bridge]), ok. init_per_testcase(_Testcase, Config) -> @@ -147,14 +152,31 @@ common_init(Config0) -> ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), % Ensure enterprise bridge module is loaded - ok = emqx_common_test_helpers:start_apps([emqx, emqx_postgresql, emqx_conf, emqx_bridge]), - _ = emqx_bridge_enterprise:module_info(), - emqx_mgmt_api_test_util:init_suite(), + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_conf, + emqx_connector, + emqx_bridge, + emqx_bridge_pgsql, + emqx_rule_engine, + emqx_management, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} + ], + #{work_dir => emqx_cth_suite:work_dir(Config0)} + ), + {ok, _Api} = emqx_common_test_http:create_default_app(), + + %% ok = emqx_common_test_helpers:start_apps([emqx, emqx_postgresql, emqx_conf, emqx_bridge]), + %% _ = emqx_bridge_enterprise:module_info(), + %% emqx_mgmt_api_test_util:init_suite(), + % Connect to pgsql directly and create the table connect_and_create_table(Config0), {Name, PGConf} = pgsql_config(BridgeType, Config0), Config = [ + {apps, Apps}, {pgsql_config, PGConf}, {pgsql_bridge_type, BridgeType}, {pgsql_name, Name}, @@ -198,6 +220,16 @@ pgsql_config(BridgeType, Config) -> "\n request_ttl = 500ms" "\n batch_size = ~b" "\n query_mode = ~s" + "\n worker_pool_size = 1" + "\n health_check_interval = 15s" + "\n start_after_created = true" + "\n start_timeout = 5s" + "\n inflight_window = 100" + "\n max_buffer_bytes = 256MB" + "\n buffer_seg_bytes = 10MB" + "\n buffer_mode = memory_only" + "\n metrics_flush_interval = 5s" + "\n resume_interval = 15s" "\n }" "\n ssl = {" "\n enable = ~w" @@ -218,6 +250,9 @@ pgsql_config(BridgeType, Config) -> ), {Name, parse_and_check(ConfigString, BridgeType, Name)}. +default_sql() -> + ?SQL_BRIDGE. + create_passfile(BridgeType, Config) -> Filename = binary_to_list(BridgeType) ++ ".passfile", Filepath = filename:join(?config(priv_dir, Config), Filename), @@ -689,14 +724,13 @@ t_missing_table(Config) -> t_table_removed(Config) -> Name = ?config(pgsql_name, Config), BridgeType = ?config(pgsql_bridge_type, Config), - %%ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ?check_trace( begin connect_and_create_table(Config), ?assertMatch({ok, _}, create_bridge(Config)), ?retry( - _Sleep = 1_000, - _Attempts = 20, + _Sleep = 100, + _Attempts = 200, ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name)) ), connect_and_drop_table(Config), diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl new file mode 100644 index 000000000..d077cece7 --- /dev/null +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl @@ -0,0 +1,233 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_bridge_v2_pgsql_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-define(BRIDGE_TYPE, pgsql). +-define(BRIDGE_TYPE_BIN, <<"pgsql">>). +-define(CONNECTOR_TYPE, pgsql). +-define(CONNECTOR_TYPE_BIN, <<"pgsql">>). + +-import(emqx_common_test_helpers, [on_exit/1]). +-import(emqx_utils_conv, [bin/1]). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + PostgresHost = os:getenv("PGSQL_TCP_HOST", "toxiproxy"), + PostgresPort = list_to_integer(os:getenv("PGSQL_TCP_PORT", "5432")), + case emqx_common_test_helpers:is_tcp_server_available(PostgresHost, PostgresPort) of + true -> + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_conf, + emqx_connector, + emqx_bridge, + emqx_bridge_pgsql, + emqx_rule_engine, + emqx_management, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + {ok, Api} = emqx_common_test_http:create_default_app(), + NConfig = [ + {apps, Apps}, + {api, Api}, + {pgsql_host, PostgresHost}, + {pgsql_port, PostgresPort}, + {enable_tls, false}, + {postgres_host, PostgresHost}, + {postgres_port, PostgresPort} + | Config + ], + emqx_bridge_pgsql_SUITE:connect_and_create_table(NConfig), + NConfig; + false -> + case os:getenv("IS_CI") of + "yes" -> + throw(no_postgres); + _ -> + {skip, no_postgres} + end + end. + +end_per_suite(Config) -> + Apps = ?config(apps, Config), + emqx_cth_suite:stop(Apps), + ok. + +init_per_testcase(TestCase, Config) -> + common_init_per_testcase(TestCase, Config). + +common_init_per_testcase(TestCase, Config) -> + ct:timetrap(timer:seconds(60)), + emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), + emqx_config:delete_override_conf_files(), + UniqueNum = integer_to_binary(erlang:unique_integer()), + Name = iolist_to_binary([atom_to_binary(TestCase), UniqueNum]), + Username = <<"root">>, + Password = <<"public">>, + Passfile = filename:join(?config(priv_dir, Config), "passfile"), + ok = file:write_file(Passfile, Password), + NConfig = [ + {postgres_username, Username}, + {postgres_password, Password}, + {postgres_passfile, Passfile} + | Config + ], + ConnectorConfig = connector_config(Name, NConfig), + BridgeConfig = bridge_config(Name, Name), + ok = snabbkaffe:start_trace(), + [ + {connector_type, ?CONNECTOR_TYPE}, + {connector_name, Name}, + {connector_config, ConnectorConfig}, + {bridge_type, ?BRIDGE_TYPE}, + {bridge_name, Name}, + {bridge_config, BridgeConfig} + | NConfig + ]. + +end_per_testcase(_Testcase, Config) -> + case proplists:get_bool(skip_does_not_apply, Config) of + true -> + ok; + false -> + emqx_bridge_pgsql_SUITE:connect_and_clear_table(Config), + emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), + emqx_common_test_helpers:call_janitor(60_000), + ok = snabbkaffe:stop(), + ok + end. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +connector_config(Name, Config) -> + PostgresHost = ?config(postgres_host, Config), + PostgresPort = ?config(postgres_port, Config), + Username = ?config(postgres_username, Config), + PassFile = ?config(postgres_passfile, Config), + InnerConfigMap0 = + #{ + <<"enable">> => true, + <<"database">> => <<"mqtt">>, + <<"server">> => iolist_to_binary([PostgresHost, ":", integer_to_binary(PostgresPort)]), + <<"pool_size">> => 8, + <<"username">> => Username, + <<"password">> => iolist_to_binary(["file://", PassFile]), + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"15s">>, + <<"start_after_created">> => true, + <<"start_timeout">> => <<"5s">> + } + }, + InnerConfigMap = serde_roundtrip(InnerConfigMap0), + parse_and_check_connector_config(InnerConfigMap, Name). + +parse_and_check_connector_config(InnerConfigMap, Name) -> + TypeBin = ?CONNECTOR_TYPE_BIN, + RawConf = #{<<"connectors">> => #{TypeBin => #{Name => InnerConfigMap}}}, + #{<<"connectors">> := #{TypeBin := #{Name := Config}}} = + hocon_tconf:check_plain(emqx_connector_schema, RawConf, #{ + required => false, atom_key => false + }), + ct:pal("parsed config: ~p", [Config]), + InnerConfigMap. + +bridge_config(Name, ConnectorId) -> + InnerConfigMap0 = + #{ + <<"enable">> => true, + <<"connector">> => ConnectorId, + <<"parameters">> => + #{<<"sql">> => emqx_bridge_pgsql_SUITE:default_sql()}, + <<"local_topic">> => <<"t/postgres">>, + <<"resource_opts">> => #{ + <<"batch_size">> => 1, + <<"batch_time">> => <<"0ms">>, + <<"buffer_mode">> => <<"memory_only">>, + <<"buffer_seg_bytes">> => <<"10MB">>, + <<"health_check_interval">> => <<"15s">>, + <<"inflight_window">> => 100, + <<"max_buffer_bytes">> => <<"256MB">>, + <<"metrics_flush_interval">> => <<"1s">>, + <<"query_mode">> => <<"sync">>, + <<"request_ttl">> => <<"45s">>, + <<"resume_interval">> => <<"15s">>, + <<"worker_pool_size">> => <<"1">> + } + }, + InnerConfigMap = serde_roundtrip(InnerConfigMap0), + parse_and_check_bridge_config(InnerConfigMap, Name). + +%% check it serializes correctly +serde_roundtrip(InnerConfigMap0) -> + IOList = hocon_pp:do(InnerConfigMap0, #{}), + {ok, InnerConfigMap} = hocon:binary(IOList), + InnerConfigMap. + +parse_and_check_bridge_config(InnerConfigMap, Name) -> + TypeBin = ?BRIDGE_TYPE_BIN, + RawConf = #{<<"bridges">> => #{TypeBin => #{Name => InnerConfigMap}}}, + hocon_tconf:check_plain(emqx_bridge_v2_schema, RawConf, #{required => false, atom_key => false}), + InnerConfigMap. + +make_message() -> + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), + Payload = emqx_guid:to_hexstr(emqx_guid:gen()), + #{ + clientid => ClientId, + payload => Payload, + timestamp => 1668602148000 + }. + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_start_stop(Config) -> + emqx_bridge_v2_testlib:t_start_stop(Config, postgres_stopped), + ok. + +t_create_via_http(Config) -> + emqx_bridge_v2_testlib:t_create_via_http(Config), + ok. + +t_on_get_status(Config) -> + emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}), + ok. + +t_sync_query(Config) -> + ok = emqx_bridge_v2_testlib:t_sync_query( + Config, + fun make_message/0, + fun(Res) -> ?assertMatch({ok, _}, Res) end, + postgres_bridge_connector_on_query_return + ), + ok. diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis_action_info.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis_action_info.erl index 6ead37170..b6b2ba012 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis_action_info.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis_action_info.erl @@ -44,12 +44,11 @@ bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) -> ActionParametersKeys = schema_keys(emqx_bridge_redis:fields(action_parameters)), ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, ActionConfig0 = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config), - ActionConfig = emqx_utils_maps:update_if_present( + emqx_utils_maps:update_if_present( <<"resource_opts">>, fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1, - ActionConfig0 - ), - ActionConfig#{<<"connector">> => ConnectorName}. + ActionConfig0#{<<"connector">> => ConnectorName} + ). bridge_v1_config_to_connector_config(BridgeV1Config) -> ActionTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields(redis_action)), diff --git a/apps/emqx_bridge_redis/test/emqx_bridge_redis_SUITE.erl b/apps/emqx_bridge_redis/test/emqx_bridge_redis_SUITE.erl index 508051f93..67480371d 100644 --- a/apps/emqx_bridge_redis/test/emqx_bridge_redis_SUITE.erl +++ b/apps/emqx_bridge_redis/test/emqx_bridge_redis_SUITE.erl @@ -599,7 +599,14 @@ toxiproxy_redis_bridge_config() -> <<"worker_pool_size">> => <<"1">>, <<"batch_size">> => integer_to_binary(?BATCH_SIZE), <<"health_check_interval">> => <<"1s">>, - <<"start_timeout">> => <<"15s">> + <<"max_buffer_bytes">> => <<"256MB">>, + <<"buffer_seg_bytes">> => <<"10MB">>, + <<"request_ttl">> => <<"45s">>, + <<"inflight_window">> => <<"100">>, + <<"resume_interval">> => <<"1s">>, + <<"metrics_flush_interval">> => <<"1s">>, + <<"start_after_created">> => true, + <<"start_timeout">> => <<"5s">> } }, maps:merge(Conf0, ?COMMON_REDIS_OPTS). @@ -611,7 +618,14 @@ username_password_redis_bridge_config() -> <<"worker_pool_size">> => <<"1">>, <<"batch_size">> => integer_to_binary(?BATCH_SIZE), <<"health_check_interval">> => <<"1s">>, - <<"start_timeout">> => <<"15s">> + <<"max_buffer_bytes">> => <<"256MB">>, + <<"buffer_seg_bytes">> => <<"10MB">>, + <<"request_ttl">> => <<"45s">>, + <<"inflight_window">> => <<"100">>, + <<"resume_interval">> => <<"15s">>, + <<"metrics_flush_interval">> => <<"1s">>, + <<"start_after_created">> => true, + <<"start_timeout">> => <<"5s">> } }, Conf1 = maps:merge(Conf0, ?COMMON_REDIS_OPTS), diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 381fe2c82..f67cd6991 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -290,7 +290,11 @@ transform_bridge_v1_config_to_action_config( TopMap = maps:with(TopKeys, ActionMap1), RestMap = maps:without(TopKeys, ActionMap1), %% Other parameters should be stuffed into `parameters' - emqx_utils_maps:deep_merge(TopMap, #{<<"parameters">> => RestMap}). + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1, + emqx_utils_maps:deep_merge(TopMap, #{<<"parameters">> => RestMap}) + ). generate_connector_name(ConnectorsMap, BridgeName, Attempt) -> ConnectorNameList = diff --git a/apps/emqx_postgresql/src/emqx_postgresql.erl b/apps/emqx_postgresql/src/emqx_postgresql.erl index eaabb8b1c..e77a88c57 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.erl +++ b/apps/emqx_postgresql/src/emqx_postgresql.erl @@ -159,7 +159,9 @@ on_stop(InstId, State) -> connector => InstId }), close_connections(State), - emqx_resource_pool:stop(InstId). + Res = emqx_resource_pool:stop(InstId), + ?tp(postgres_stopped, #{instance_id => InstId}), + Res. close_connections(#{pool_name := PoolName} = _State) -> WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], @@ -301,6 +303,7 @@ on_query( Type = pgsql_query_type(TypeOrKey), {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State), Res = on_sql_query(InstId, PoolName, Type, NameOrSQL2, Data), + ?tp(postgres_bridge_connector_on_query_return, #{instance_id => InstId, result => Res}), handle_result(Res). pgsql_query_type(sql) ->