diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 68ec59894..fe495252a 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -172,10 +172,15 @@ on_query( %% not return result, next loop will try again on_query(InstId, {TypeOrKey, SQLOrKey, Params, Timeout}, State); {error, Reason} -> - LogMeta = #{connector => InstId, sql => SQLOrKey, state => State}, - ?SLOG( + ?tp( error, - LogMeta#{msg => "mysql_connector_do_prepare_failed", reason => Reason} + "mysql_connector_do_prepare_failed", + #{ + connector => InstId, + sql => SQLOrKey, + state => State, + reason => Reason + } ), {error, Reason} end; @@ -417,12 +422,10 @@ on_sql_query( ), do_sql_query(SQLFunc, Conn, SQLOrKey, Params, Timeout, LogMeta); {error, disconnected} -> - ?SLOG( + ?tp( error, - LogMeta#{ - msg => "mysql_connector_do_sql_query_failed", - reason => worker_is_disconnected - } + "mysql_connector_do_sql_query_failed", + LogMeta#{reason => worker_is_disconnected} ), {error, {recoverable_error, disconnected}} end. diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index 1fc994275..14cbbc80f 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -44,7 +44,8 @@ execute_batch/3 ]). --export([do_get_status/1]). +%% for ecpool workers usage +-export([do_get_status/1, prepare_sql_to_conn/2]). -define(PGSQL_HOST_OPTIONS, #{ default_port => ?PGSQL_DEFAULT_PORT diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index e89278e8c..7db886542 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -30,18 +30,6 @@ namespace() -> "resource_schema". roots() -> []. -fields("resource_opts_sync_only") -> - [ - {resource_opts, - mk( - ref(?MODULE, "creation_opts_sync_only"), - resource_opts_meta() - )} - ]; -fields("creation_opts_sync_only") -> - Fields = fields("creation_opts"), - QueryMod = {query_mode, fun query_mode_sync_only/1}, - lists:keyreplace(query_mode, 1, Fields, QueryMod); fields("resource_opts") -> [ {resource_opts, @@ -117,12 +105,6 @@ query_mode(default) -> async; query_mode(required) -> false; query_mode(_) -> undefined. -query_mode_sync_only(type) -> enum([sync]); -query_mode_sync_only(desc) -> ?DESC("query_mode_sync_only"); -query_mode_sync_only(default) -> sync; -query_mode_sync_only(required) -> false; -query_mode_sync_only(_) -> undefined. - request_timeout(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]); request_timeout(desc) -> ?DESC("request_timeout"); request_timeout(default) -> <<"15s">>; @@ -167,7 +149,4 @@ max_queue_bytes(default) -> ?DEFAULT_QUEUE_SIZE_RAW; max_queue_bytes(required) -> false; max_queue_bytes(_) -> undefined. -desc("creation_opts") -> - ?DESC("creation_opts"); -desc("creation_opts_sync_only") -> - ?DESC("creation_opts"). +desc("creation_opts") -> ?DESC("creation_opts"). diff --git a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl index 209332fe7..94adb3506 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl @@ -687,10 +687,6 @@ t_jq(_) -> got_timeout end, ConfigRootKey = emqx_rule_engine_schema:namespace(), - DefaultTimeOut = emqx_config:get([ - ConfigRootKey, - jq_function_default_timeout - ]), ?assertThrow( {jq_exception, {timeout, _}}, apply_func(jq, [TOProgram, <<"-2">>]) diff --git a/changes/ce/feat-10306.en.md b/changes/ce/feat-10306.en.md new file mode 100644 index 000000000..11754c5c0 --- /dev/null +++ b/changes/ce/feat-10306.en.md @@ -0,0 +1,3 @@ +Add support for `async` query mode for most bridges. + +Before this change, some bridges (Cassandra, MongoDB, MySQL, Postgres, Redis, RocketMQ, TDengine) were only allowed to be created with a `sync` query mode. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_cassa.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_cassa.erl index 12f86fcf7..78db8352a 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_cassa.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_cassa.erl @@ -86,21 +86,10 @@ fields("config") -> mk( binary(), #{desc => ?DESC("local_topic"), default => undefined} - )}, - {resource_opts, - mk( - ref(?MODULE, "creation_opts"), - #{ - required => false, - default => #{}, - desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) - } )} - ] ++ + ] ++ emqx_resource_schema:fields("resource_opts") ++ (emqx_ee_connector_cassa:fields(config) -- emqx_connector_schema_lib:prepare_statement_fields()); -fields("creation_opts") -> - emqx_resource_schema:fields("creation_opts_sync_only"); fields("post") -> fields("post", cassandra); fields("put") -> @@ -115,8 +104,6 @@ desc("config") -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for Cassandra using `", string:to_upper(Method), "` method."]; -desc("creation_opts" = Name) -> - emqx_resource_schema:desc(Name); desc(_) -> undefined. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl index bc450f39b..5dd3ef121 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl @@ -38,7 +38,7 @@ fields("config") -> {enable, mk(boolean(), #{desc => ?DESC("enable"), default => true})}, {collection, mk(binary(), #{desc => ?DESC("collection"), default => <<"mqtt">>})}, {payload_template, mk(binary(), #{required => false, desc => ?DESC("payload_template")})} - ] ++ emqx_resource_schema:fields("resource_opts_sync_only"); + ] ++ emqx_resource_schema:fields("resource_opts"); fields(mongodb_rs) -> emqx_connector_mongo:fields(rs) ++ fields("config"); fields(mongodb_sharded) -> diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl index eed4172ab..e5c9a5aab 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl @@ -79,21 +79,10 @@ fields("config") -> mk( binary(), #{desc => ?DESC("local_topic"), default => undefined} - )}, - {resource_opts, - mk( - ref(?MODULE, "creation_opts"), - #{ - required => false, - default => #{}, - desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) - } )} - ] ++ + ] ++ emqx_resource_schema:fields("resource_opts") ++ (emqx_connector_mysql:fields(config) -- emqx_connector_schema_lib:prepare_statement_fields()); -fields("creation_opts") -> - emqx_resource_schema:fields("creation_opts_sync_only"); fields("post") -> [type_field(), name_field() | fields("config")]; fields("put") -> @@ -105,8 +94,6 @@ desc("config") -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for MySQL using `", string:to_upper(Method), "` method."]; -desc("creation_opts" = Name) -> - emqx_resource_schema:desc(Name); desc(_) -> undefined. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl index 46132bd99..b5f0c3e62 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl @@ -81,21 +81,10 @@ fields("config") -> mk( binary(), #{desc => ?DESC("local_topic"), default => undefined} - )}, - {resource_opts, - mk( - ref(?MODULE, "creation_opts"), - #{ - required => false, - default => #{}, - desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) - } )} - ] ++ + ] ++ emqx_resource_schema:fields("resource_opts") ++ (emqx_connector_pgsql:fields(config) -- emqx_connector_schema_lib:prepare_statement_fields()); -fields("creation_opts") -> - emqx_resource_schema:fields("creation_opts_sync_only"); fields("post") -> fields("post", pgsql); fields("put") -> @@ -110,8 +99,6 @@ desc("config") -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for PostgreSQL using `", string:to_upper(Method), "` method."]; -desc("creation_opts" = Name) -> - emqx_resource_schema:desc(Name); desc(_) -> undefined. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl index fa6958b6d..1861b56ec 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl @@ -180,10 +180,10 @@ resource_fields(Type) -> resource_creation_fields("redis_cluster") -> % TODO % Cluster bridge is currently incompatible with batching. - Fields = emqx_resource_schema:fields("creation_opts_sync_only"), + Fields = emqx_resource_schema:fields("creation_opts"), lists:foldl(fun proplists:delete/2, Fields, [batch_size, batch_time, enable_batch]); resource_creation_fields(_) -> - emqx_resource_schema:fields("creation_opts_sync_only"). + emqx_resource_schema:fields("creation_opts"). desc("config") -> ?DESC("desc_config"); diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_rocketmq.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_rocketmq.erl index 124e18069..78fd527d3 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_rocketmq.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_rocketmq.erl @@ -80,21 +80,10 @@ fields("config") -> mk( binary(), #{desc => ?DESC("local_topic"), required => false} - )}, - {resource_opts, - mk( - ref(?MODULE, "creation_opts"), - #{ - required => false, - default => #{<<"request_timeout">> => ?DEFFAULT_REQ_TIMEOUT}, - desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) - } )} - ] ++ + ] ++ emqx_resource_schema:fields("resource_opts") ++ (emqx_ee_connector_rocketmq:fields(config) -- emqx_connector_schema_lib:prepare_statement_fields()); -fields("creation_opts") -> - emqx_resource_schema:fields("creation_opts_sync_only"); fields("post") -> [type_field(), name_field() | fields("config")]; fields("put") -> @@ -106,8 +95,6 @@ desc("config") -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for RocketMQ using `", string:to_upper(Method), "` method."]; -desc("creation_opts" = Name) -> - emqx_resource_schema:desc(Name); desc(_) -> undefined. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_tdengine.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_tdengine.erl index f031cbfbf..7a958d45f 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_tdengine.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_tdengine.erl @@ -80,19 +80,8 @@ fields("config") -> mk( binary(), #{desc => ?DESC("local_topic"), default => undefined} - )}, - {resource_opts, - mk( - ref(?MODULE, "creation_opts"), - #{ - required => false, - default => #{}, - desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) - } )} - ] ++ emqx_ee_connector_tdengine:fields(config); -fields("creation_opts") -> - emqx_resource_schema:fields("creation_opts_sync_only"); + ] ++ emqx_resource_schema:fields("resource_opts") ++ emqx_ee_connector_tdengine:fields(config); fields("post") -> [type_field(), name_field() | fields("config")]; fields("put") -> @@ -104,8 +93,6 @@ desc("config") -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for TDengine using `", string:to_upper(Method), "` method."]; -desc("creation_opts" = Name) -> - emqx_resource_schema:desc(Name); desc(_) -> undefined. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_cassa_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_cassa_SUITE.erl index d040000e2..f1ea6e930 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_cassa_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_cassa_SUITE.erl @@ -73,15 +73,16 @@ all() -> groups() -> TCs = emqx_common_test_helpers:all(?MODULE), NonBatchCases = [t_write_timeout], + QueryModeGroups = [{group, async}, {group, sync}], + BatchingGroups = [ + %{group, with_batch}, + {group, without_batch} + ], [ - {tcp, [ - %{group, with_batch}, - {group, without_batch} - ]}, - {tls, [ - %{group, with_batch}, - {group, without_batch} - ]}, + {tcp, QueryModeGroups}, + {tls, QueryModeGroups}, + {async, BatchingGroups}, + {sync, BatchingGroups}, {with_batch, TCs -- NonBatchCases}, {without_batch, TCs} ]. @@ -93,7 +94,6 @@ init_per_group(tcp, Config) -> {cassa_host, Host}, {cassa_port, Port}, {enable_tls, false}, - {query_mode, sync}, {proxy_name, "cassa_tcp"} | Config ]; @@ -104,10 +104,13 @@ init_per_group(tls, Config) -> {cassa_host, Host}, {cassa_port, Port}, {enable_tls, true}, - {query_mode, sync}, {proxy_name, "cassa_tls"} | Config ]; +init_per_group(async, Config) -> + [{query_mode, async} | Config]; +init_per_group(sync, Config) -> + [{query_mode, sync} | Config]; init_per_group(with_batch, Config0) -> Config = [{enable_batch, true} | Config0], common_init(Config); @@ -139,14 +142,15 @@ end_per_suite(_Config) -> init_per_testcase(_Testcase, Config) -> connect_and_clear_table(Config), delete_bridge(Config), + snabbkaffe:start_trace(), Config. end_per_testcase(_Testcase, Config) -> ProxyHost = ?config(proxy_host, Config), ProxyPort = ?config(proxy_port, Config), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - connect_and_clear_table(Config), ok = snabbkaffe:stop(), + connect_and_clear_table(Config), delete_bridge(Config), ok. @@ -171,6 +175,7 @@ common_init(Config0) -> ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), emqx_mgmt_api_test_util:init_suite(), % Connect to cassnadra directly and create the table + catch connect_and_drop_table(Config0), connect_and_create_table(Config0), {Name, CassaConf} = cassa_config(BridgeType, Config0), Config = @@ -250,9 +255,13 @@ parse_and_check(ConfigString, BridgeType, Name) -> Config. create_bridge(Config) -> + create_bridge(Config, _Overrides = #{}). + +create_bridge(Config, Overrides) -> BridgeType = ?config(cassa_bridge_type, Config), Name = ?config(cassa_name, Config), - BridgeConfig = ?config(cassa_config, Config), + BridgeConfig0 = ?config(cassa_config, Config), + BridgeConfig = emqx_map_lib:deep_merge(BridgeConfig0, Overrides), emqx_bridge:create(BridgeType, Name, BridgeConfig). delete_bridge(Config) -> @@ -288,6 +297,27 @@ query_resource(Config, Request) -> ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), emqx_resource:query(ResourceID, Request, #{timeout => 1_000}). +query_resource_async(Config, Request) -> + Name = ?config(cassa_name, Config), + BridgeType = ?config(cassa_bridge_type, Config), + Ref = alias([reply]), + AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end, + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + Return = emqx_resource:query(ResourceID, Request, #{ + timeout => 500, async_reply_fun => {AsyncReplyFun, []} + }), + {Return, Ref}. + +receive_result(Ref, Timeout) when is_reference(Ref) -> + receive + {result, Ref, Result} -> + {ok, Result}; + {Ref, Result} -> + {ok, Result} + after Timeout -> + timeout + end. + connect_direct_cassa(Config) -> Opts = #{ nodes => [{?config(cassa_host, Config), ?config(cassa_port, Config)}], @@ -546,15 +576,27 @@ t_write_failure(Config) -> % ok. t_simple_sql_query(Config) -> + EnableBatch = ?config(enable_batch, Config), + QueryMode = ?config(query_mode, Config), ?assertMatch( {ok, _}, create_bridge(Config) ), Request = {query, <<"SELECT count(1) AS T FROM system.local">>}, - Result = query_resource(Config, Request), - case ?config(enable_batch, Config) of - true -> ?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result); - false -> ?assertMatch({ok, {<<"system.local">>, _, [[1]]}}, Result) + Result = + case QueryMode of + sync -> + query_resource(Config, Request); + async -> + {_, Ref} = query_resource_async(Config, Request), + {ok, Res} = receive_result(Ref, 2_000), + Res + end, + case EnableBatch of + true -> + ?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result); + false -> + ?assertMatch({ok, {<<"system.local">>, _, [[1]]}}, Result) end, ok. @@ -565,22 +607,56 @@ t_missing_data(Config) -> ), %% emqx_ee_connector_cassa will send missed data as a `null` atom %% to ecql driver - Result = send_message(Config, #{}), + {_, {ok, Event}} = + ?wait_async_action( + send_message(Config, #{}), + #{?snk_kind := buffer_worker_flush_ack}, + 2_000 + ), ?assertMatch( %% TODO: match error msgs - {error, {unrecoverable_error, {8704, <<"Expected 8 or 0 byte long for date (4)">>}}}, - Result + #{ + result := + {error, {unrecoverable_error, {8704, <<"Expected 8 or 0 byte long for date (4)">>}}} + }, + Event ), ok. t_bad_sql_parameter(Config) -> + QueryMode = ?config(query_mode, Config), + EnableBatch = ?config(enable_batch, Config), + Name = ?config(cassa_name, Config), + ResourceId = emqx_bridge_resource:resource_id(cassandra, Name), ?assertMatch( {ok, _}, - create_bridge(Config) + create_bridge( + Config, + #{ + <<"resource_opts">> => #{ + <<"request_timeout">> => 500, + <<"resume_interval">> => 100, + <<"health_check_interval">> => 100 + } + } + ) ), Request = {query, <<"">>, [bad_parameter]}, - Result = query_resource(Config, Request), - case ?config(enable_batch, Config) of + Result = + case QueryMode of + sync -> + query_resource(Config, Request); + async -> + {_, Ref} = query_resource_async(Config, Request), + case receive_result(Ref, 5_000) of + {ok, Res} -> + Res; + timeout -> + ct:pal("mailbox:\n ~p", [process_info(self(), messages)]), + ct:fail("no response received") + end + end, + case EnableBatch of true -> ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result); false -> diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl index 9850c9529..116dcc729 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl @@ -9,6 +9,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). %%------------------------------------------------------------------------------ %% CT boilerplate @@ -16,9 +17,8 @@ all() -> [ - {group, rs}, - {group, sharded}, - {group, single} + {group, async}, + {group, sync} | (emqx_common_test_helpers:all(?MODULE) -- group_tests()) ]. @@ -31,12 +31,23 @@ group_tests() -> ]. groups() -> + TypeGroups = [ + {group, rs}, + {group, sharded}, + {group, single} + ], [ + {async, TypeGroups}, + {sync, TypeGroups}, {rs, group_tests()}, {sharded, group_tests()}, {single, group_tests()} ]. +init_per_group(async, Config) -> + [{query_mode, async} | Config]; +init_per_group(sync, Config) -> + [{query_mode, sync} | Config]; init_per_group(Type = rs, Config) -> MongoHost = os:getenv("MONGO_RS_HOST", "mongo1"), MongoPort = list_to_integer(os:getenv("MONGO_RS_PORT", "27017")), @@ -44,7 +55,7 @@ init_per_group(Type = rs, Config) -> true -> ok = start_apps(), emqx_mgmt_api_test_util:init_suite(), - {Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type), + {Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type, Config), [ {mongo_host, MongoHost}, {mongo_port, MongoPort}, @@ -63,7 +74,7 @@ init_per_group(Type = sharded, Config) -> true -> ok = start_apps(), emqx_mgmt_api_test_util:init_suite(), - {Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type), + {Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type, Config), [ {mongo_host, MongoHost}, {mongo_port, MongoPort}, @@ -82,7 +93,7 @@ init_per_group(Type = single, Config) -> true -> ok = start_apps(), emqx_mgmt_api_test_util:init_suite(), - {Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type), + {Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type, Config), [ {mongo_host, MongoHost}, {mongo_port, MongoPort}, @@ -99,6 +110,7 @@ end_per_group(_Type, _Config) -> ok. init_per_suite(Config) -> + emqx_common_test_helpers:clear_screen(), Config. end_per_suite(_Config) -> @@ -109,11 +121,13 @@ end_per_suite(_Config) -> init_per_testcase(_Testcase, Config) -> catch clear_db(Config), delete_bridge(Config), + snabbkaffe:start_trace(), Config. end_per_testcase(_Testcase, Config) -> catch clear_db(Config), delete_bridge(Config), + snabbkaffe:stop(), ok. %%------------------------------------------------------------------------------ @@ -140,7 +154,8 @@ mongo_type_bin(sharded) -> mongo_type_bin(single) -> <<"mongodb_single">>. -mongo_config(MongoHost, MongoPort0, rs = Type) -> +mongo_config(MongoHost, MongoPort0, rs = Type, Config) -> + QueryMode = ?config(query_mode, Config), MongoPort = integer_to_list(MongoPort0), Servers = MongoHost ++ ":" ++ MongoPort, Name = atom_to_binary(?MODULE), @@ -154,13 +169,19 @@ mongo_config(MongoHost, MongoPort0, rs = Type) -> " w_mode = safe\n" " database = mqtt\n" " resource_opts = {\n" + " query_mode = ~s\n" " worker_pool_size = 1\n" " }\n" "}", - [Name, Servers] + [ + Name, + Servers, + QueryMode + ] ), {Name, parse_and_check(ConfigString, Type, Name)}; -mongo_config(MongoHost, MongoPort0, sharded = Type) -> +mongo_config(MongoHost, MongoPort0, sharded = Type, Config) -> + QueryMode = ?config(query_mode, Config), MongoPort = integer_to_list(MongoPort0), Servers = MongoHost ++ ":" ++ MongoPort, Name = atom_to_binary(?MODULE), @@ -173,13 +194,19 @@ mongo_config(MongoHost, MongoPort0, sharded = Type) -> " w_mode = safe\n" " database = mqtt\n" " resource_opts = {\n" + " query_mode = ~s\n" " worker_pool_size = 1\n" " }\n" "}", - [Name, Servers] + [ + Name, + Servers, + QueryMode + ] ), {Name, parse_and_check(ConfigString, Type, Name)}; -mongo_config(MongoHost, MongoPort0, single = Type) -> +mongo_config(MongoHost, MongoPort0, single = Type, Config) -> + QueryMode = ?config(query_mode, Config), MongoPort = integer_to_list(MongoPort0), Server = MongoHost ++ ":" ++ MongoPort, Name = atom_to_binary(?MODULE), @@ -192,10 +219,15 @@ mongo_config(MongoHost, MongoPort0, single = Type) -> " w_mode = safe\n" " database = mqtt\n" " resource_opts = {\n" + " query_mode = ~s\n" " worker_pool_size = 1\n" " }\n" "}", - [Name, Server] + [ + Name, + Server, + QueryMode + ] ), {Name, parse_and_check(ConfigString, Type, Name)}. @@ -248,7 +280,7 @@ find_all(Config) -> Name = ?config(mongo_name, Config), #{<<"collection">> := Collection} = ?config(mongo_config, Config), ResourceID = emqx_bridge_resource:resource_id(Type, Name), - emqx_resource:query(ResourceID, {find, Collection, #{}, #{}}). + emqx_resource:simple_sync_query(ResourceID, {find, Collection, #{}, #{}}). send_message(Config, Payload) -> Name = ?config(mongo_name, Config), @@ -266,7 +298,12 @@ t_setup_via_config_and_publish(Config) -> create_bridge(Config) ), Val = erlang:unique_integer(), - ok = send_message(Config, #{key => Val}), + {ok, {ok, _}} = + ?wait_async_action( + send_message(Config, #{key => Val}), + #{?snk_kind := mongo_ee_connector_on_query_return}, + 5_000 + ), ?assertMatch( {ok, [#{<<"key">> := Val}]}, find_all(Config) @@ -286,7 +323,12 @@ t_setup_via_http_api_and_publish(Config) -> create_bridge_http(MongoConfig) ), Val = erlang:unique_integer(), - ok = send_message(Config, #{key => Val}), + {ok, {ok, _}} = + ?wait_async_action( + send_message(Config, #{key => Val}), + #{?snk_kind := mongo_ee_connector_on_query_return}, + 5_000 + ), ?assertMatch( {ok, [#{<<"key">> := Val}]}, find_all(Config) @@ -297,7 +339,12 @@ t_payload_template(Config) -> {ok, _} = create_bridge(Config, #{<<"payload_template">> => <<"{\"foo\": \"${clientid}\"}">>}), Val = erlang:unique_integer(), ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), - ok = send_message(Config, #{key => Val, clientid => ClientId}), + {ok, {ok, _}} = + ?wait_async_action( + send_message(Config, #{key => Val, clientid => ClientId}), + #{?snk_kind := mongo_ee_connector_on_query_return}, + 5_000 + ), ?assertMatch( {ok, [#{<<"foo">> := ClientId}]}, find_all(Config) @@ -314,11 +361,16 @@ t_collection_template(Config) -> ), Val = erlang:unique_integer(), ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), - ok = send_message(Config, #{ - key => Val, - clientid => ClientId, - mycollectionvar => <<"mycol">> - }), + {ok, {ok, _}} = + ?wait_async_action( + send_message(Config, #{ + key => Val, + clientid => ClientId, + mycollectionvar => <<"mycol">> + }), + #{?snk_kind := mongo_ee_connector_on_query_return}, + 5_000 + ), ?assertMatch( {ok, [#{<<"foo">> := ClientId}]}, find_all(Config) diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl index 93e9e6fee..38e31c7ae 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl @@ -45,15 +45,16 @@ all() -> groups() -> TCs = emqx_common_test_helpers:all(?MODULE), NonBatchCases = [t_write_timeout, t_uninitialized_prepared_statement], + BatchingGroups = [ + {group, with_batch}, + {group, without_batch} + ], + QueryModeGroups = [{group, async}, {group, sync}], [ - {tcp, [ - {group, with_batch}, - {group, without_batch} - ]}, - {tls, [ - {group, with_batch}, - {group, without_batch} - ]}, + {tcp, QueryModeGroups}, + {tls, QueryModeGroups}, + {async, BatchingGroups}, + {sync, BatchingGroups}, {with_batch, TCs -- NonBatchCases}, {without_batch, TCs} ]. @@ -65,7 +66,6 @@ init_per_group(tcp, Config) -> {mysql_host, MysqlHost}, {mysql_port, MysqlPort}, {enable_tls, false}, - {query_mode, sync}, {proxy_name, "mysql_tcp"} | Config ]; @@ -76,10 +76,13 @@ init_per_group(tls, Config) -> {mysql_host, MysqlHost}, {mysql_port, MysqlPort}, {enable_tls, true}, - {query_mode, sync}, {proxy_name, "mysql_tls"} | Config ]; +init_per_group(async, Config) -> + [{query_mode, async} | Config]; +init_per_group(sync, Config) -> + [{query_mode, sync} | Config]; init_per_group(with_batch, Config0) -> Config = [{batch_size, 100} | Config0], common_init(Config); @@ -99,6 +102,7 @@ end_per_group(_Group, _Config) -> ok. init_per_suite(Config) -> + emqx_common_test_helpers:clear_screen(), Config. end_per_suite(_Config) -> @@ -109,6 +113,7 @@ end_per_suite(_Config) -> init_per_testcase(_Testcase, Config) -> connect_and_clear_table(Config), delete_bridge(Config), + snabbkaffe:start_trace(), Config. end_per_testcase(_Testcase, Config) -> @@ -237,6 +242,25 @@ query_resource(Config, Request) -> ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), emqx_resource:query(ResourceID, Request, #{timeout => 500}). +query_resource_async(Config, Request) -> + Name = ?config(mysql_name, Config), + BridgeType = ?config(mysql_bridge_type, Config), + Ref = alias([reply]), + AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end, + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + Return = emqx_resource:query(ResourceID, Request, #{ + timeout => 500, async_reply_fun => {AsyncReplyFun, []} + }), + {Return, Ref}. + +receive_result(Ref, Timeout) -> + receive + {result, Ref, Result} -> + {ok, Result} + after Timeout -> + timeout + end. + unprepare(Config, Key) -> Name = ?config(mysql_name, Config), BridgeType = ?config(mysql_bridge_type, Config), @@ -409,17 +433,29 @@ t_write_failure(Config) -> Val = integer_to_binary(erlang:unique_integer()), SentData = #{payload => Val, timestamp => 1668602148000}, ?check_trace( - emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> - case QueryMode of - sync -> - ?assertMatch( - {error, {resource_error, #{reason := timeout}}}, + begin + %% for some unknown reason, `?wait_async_action' and `subscribe' + %% hang and timeout if called inside `with_failure', but the event + %% happens and is emitted after the test pid dies!? + {ok, SRef} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := buffer_worker_flush_nack}), + 2_000 + ), + emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> + case QueryMode of + sync -> + ?assertMatch( + {error, {resource_error, #{reason := timeout}}}, + send_message(Config, SentData) + ); + async -> send_message(Config, SentData) - ); - async -> - send_message(Config, SentData) - end - end), + end, + ?assertMatch({ok, [#{result := {error, _}}]}, snabbkaffe:receive_events(SRef)), + ok + end), + ok + end, fun(Trace0) -> ct:pal("trace: ~p", [Trace0]), Trace = ?of_kind(buffer_worker_flush_nack, Trace0), @@ -443,27 +479,52 @@ t_write_timeout(Config) -> ProxyName = ?config(proxy_name, Config), ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config), + QueryMode = ?config(query_mode, Config), {ok, _} = create_bridge(Config), Val = integer_to_binary(erlang:unique_integer()), SentData = #{payload => Val, timestamp => 1668602148000}, Timeout = 1000, + %% for some unknown reason, `?wait_async_action' and `subscribe' + %% hang and timeout if called inside `with_failure', but the event + %% happens and is emitted after the test pid dies!? + {ok, SRef} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := buffer_worker_flush_nack}), + 2 * Timeout + ), emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() -> - ?assertMatch( - {error, {resource_error, #{reason := timeout}}}, - query_resource(Config, {send_message, SentData, [], Timeout}) - ) + case QueryMode of + sync -> + ?assertMatch( + {error, {resource_error, #{reason := timeout}}}, + query_resource(Config, {send_message, SentData, [], Timeout}) + ); + async -> + query_resource(Config, {send_message, SentData, [], Timeout}), + ok + end, + ok end), + ?assertMatch({ok, [#{result := {error, _}}]}, snabbkaffe:receive_events(SRef)), ok. t_simple_sql_query(Config) -> + QueryMode = ?config(query_mode, Config), + BatchSize = ?config(batch_size, Config), + IsBatch = BatchSize > 1, ?assertMatch( {ok, _}, create_bridge(Config) ), Request = {sql, <<"SELECT count(1) AS T">>}, - Result = query_resource(Config, Request), - BatchSize = ?config(batch_size, Config), - IsBatch = BatchSize > 1, + Result = + case QueryMode of + sync -> + query_resource(Config, Request); + async -> + {_, Ref} = query_resource_async(Config, Request), + {ok, Res} = receive_result(Ref, 2_000), + Res + end, case IsBatch of true -> ?assertEqual({error, {unrecoverable_error, batch_select_not_implemented}}, Result); false -> ?assertEqual({ok, [<<"T">>], [[1]]}, Result) @@ -471,25 +532,37 @@ t_simple_sql_query(Config) -> ok. t_missing_data(Config) -> + BatchSize = ?config(batch_size, Config), + IsBatch = BatchSize > 1, ?assertMatch( {ok, _}, create_bridge(Config) ), - Result = send_message(Config, #{}), - BatchSize = ?config(batch_size, Config), - IsBatch = BatchSize > 1, + {ok, SRef} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := buffer_worker_flush_ack}), + 2_000 + ), + send_message(Config, #{}), + {ok, [Event]} = snabbkaffe:receive_events(SRef), case IsBatch of true -> ?assertMatch( - {error, - {unrecoverable_error, - {1292, _, <<"Truncated incorrect DOUBLE value: 'undefined'">>}}}, - Result + #{ + result := + {error, + {unrecoverable_error, + {1292, _, <<"Truncated incorrect DOUBLE value: 'undefined'">>}}} + }, + Event ); false -> ?assertMatch( - {error, {unrecoverable_error, {1048, _, <<"Column 'arrived' cannot be null">>}}}, - Result + #{ + result := + {error, + {unrecoverable_error, {1048, _, <<"Column 'arrived' cannot be null">>}}} + }, + Event ) end, ok. @@ -500,14 +573,22 @@ t_bad_sql_parameter(Config) -> create_bridge(Config) ), Request = {sql, <<"">>, [bad_parameter]}, - Result = query_resource(Config, Request), + {_, {ok, Event}} = + ?wait_async_action( + query_resource(Config, Request), + #{?snk_kind := buffer_worker_flush_ack}, + 2_000 + ), BatchSize = ?config(batch_size, Config), IsBatch = BatchSize > 1, case IsBatch of true -> - ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result); + ?assertMatch(#{result := {error, {unrecoverable_error, invalid_request}}}, Event); false -> - ?assertEqual({error, {unrecoverable_error, {invalid_params, [bad_parameter]}}}, Result) + ?assertMatch( + #{result := {error, {unrecoverable_error, {invalid_params, [bad_parameter]}}}}, + Event + ) end, ok. @@ -515,7 +596,12 @@ t_nasty_sql_string(Config) -> ?assertMatch({ok, _}, create_bridge(Config)), Payload = list_to_binary(lists:seq(0, 255)), Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)}, - Result = send_message(Config, Message), + {Result, {ok, _}} = + ?wait_async_action( + send_message(Config, Message), + #{?snk_kind := mysql_connector_query_return}, + 1_000 + ), ?assertEqual(ok, Result), ?assertMatch( {ok, [<<"payload">>], [[Payload]]}, @@ -561,12 +647,22 @@ t_unprepared_statement_query(Config) -> create_bridge(Config) ), Request = {prepared_query, unprepared_query, []}, - Result = query_resource(Config, Request), + {_, {ok, Event}} = + ?wait_async_action( + query_resource(Config, Request), + #{?snk_kind := buffer_worker_flush_ack}, + 2_000 + ), BatchSize = ?config(batch_size, Config), IsBatch = BatchSize > 1, case IsBatch of - true -> ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result); - false -> ?assertEqual({error, {unrecoverable_error, prepared_statement_invalid}}, Result) + true -> + ?assertMatch(#{result := {error, {unrecoverable_error, invalid_request}}}, Event); + false -> + ?assertMatch( + #{result := {error, {unrecoverable_error, prepared_statement_invalid}}}, + Event + ) end, ok. @@ -582,7 +678,13 @@ t_uninitialized_prepared_statement(Config) -> unprepare(Config, send_message), ?check_trace( begin - ?assertEqual(ok, send_message(Config, SentData)), + {Res, {ok, _}} = + ?wait_async_action( + send_message(Config, SentData), + #{?snk_kind := mysql_connector_query_return}, + 2_000 + ), + ?assertEqual(ok, Res), ok end, fun(Trace) -> diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl index 10359a128..83cb8b1f3 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl @@ -42,19 +42,18 @@ all() -> groups() -> TCs = emqx_common_test_helpers:all(?MODULE), NonBatchCases = [t_write_timeout], + BatchVariantGroups = [ + {group, with_batch}, + {group, without_batch}, + {group, matrix}, + {group, timescale} + ], + QueryModeGroups = [{async, BatchVariantGroups}, {sync, BatchVariantGroups}], [ - {tcp, [ - {group, with_batch}, - {group, without_batch}, - {group, matrix}, - {group, timescale} - ]}, - {tls, [ - {group, with_batch}, - {group, without_batch}, - {group, matrix}, - {group, timescale} - ]}, + {tcp, QueryModeGroups}, + {tls, QueryModeGroups}, + {async, BatchVariantGroups}, + {sync, BatchVariantGroups}, {with_batch, TCs -- NonBatchCases}, {without_batch, TCs}, {matrix, [t_setup_via_config_and_publish, t_setup_via_http_api_and_publish]}, @@ -68,7 +67,6 @@ init_per_group(tcp, Config) -> {pgsql_host, Host}, {pgsql_port, Port}, {enable_tls, false}, - {query_mode, sync}, {proxy_name, "pgsql_tcp"} | Config ]; @@ -79,10 +77,13 @@ init_per_group(tls, Config) -> {pgsql_host, Host}, {pgsql_port, Port}, {enable_tls, true}, - {query_mode, sync}, {proxy_name, "pgsql_tls"} | Config ]; +init_per_group(async, Config) -> + [{query_mode, async} | Config]; +init_per_group(sync, Config) -> + [{query_mode, sync} | Config]; init_per_group(with_batch, Config0) -> Config = [{enable_batch, true} | Config0], common_init(Config); @@ -118,6 +119,7 @@ end_per_suite(_Config) -> init_per_testcase(_Testcase, Config) -> connect_and_clear_table(Config), delete_bridge(Config), + snabbkaffe:start_trace(), Config. end_per_testcase(_Testcase, Config) -> @@ -221,9 +223,13 @@ parse_and_check(ConfigString, BridgeType, Name) -> Config. create_bridge(Config) -> + create_bridge(Config, _Overrides = #{}). + +create_bridge(Config, Overrides) -> BridgeType = ?config(pgsql_bridge_type, Config), Name = ?config(pgsql_name, Config), - PGConfig = ?config(pgsql_config, Config), + PGConfig0 = ?config(pgsql_config, Config), + PGConfig = emqx_map_lib:deep_merge(PGConfig0, Overrides), emqx_bridge:create(BridgeType, Name, PGConfig). delete_bridge(Config) -> @@ -251,6 +257,27 @@ query_resource(Config, Request) -> ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), emqx_resource:query(ResourceID, Request, #{timeout => 1_000}). +query_resource_async(Config, Request) -> + Name = ?config(pgsql_name, Config), + BridgeType = ?config(pgsql_bridge_type, Config), + Ref = alias([reply]), + AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end, + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + Return = emqx_resource:query(ResourceID, Request, #{ + timeout => 500, async_reply_fun => {AsyncReplyFun, []} + }), + {Return, Ref}. + +receive_result(Ref, Timeout) -> + receive + {result, Ref, Result} -> + {ok, Result}; + {Ref, Result} -> + {ok, Result} + after Timeout -> + timeout + end. + connect_direct_pgsql(Config) -> Opts = #{ host => ?config(pgsql_host, Config), @@ -308,11 +335,12 @@ t_setup_via_config_and_publish(Config) -> SentData = #{payload => Val, timestamp => 1668602148000}, ?check_trace( begin - ?wait_async_action( - ?assertEqual({ok, 1}, send_message(Config, SentData)), - #{?snk_kind := pgsql_connector_query_return}, - 10_000 - ), + {_, {ok, _}} = + ?wait_async_action( + send_message(Config, SentData), + #{?snk_kind := pgsql_connector_query_return}, + 10_000 + ), ?assertMatch( Val, connect_and_get_payload(Config) @@ -336,6 +364,7 @@ t_setup_via_http_api_and_publish(Config) -> BridgeType = ?config(pgsql_bridge_type, Config), Name = ?config(pgsql_name, Config), PgsqlConfig0 = ?config(pgsql_config, Config), + QueryMode = ?config(query_mode, Config), PgsqlConfig = PgsqlConfig0#{ <<"name">> => Name, <<"type">> => BridgeType @@ -348,11 +377,18 @@ t_setup_via_http_api_and_publish(Config) -> SentData = #{payload => Val, timestamp => 1668602148000}, ?check_trace( begin - ?wait_async_action( - ?assertEqual({ok, 1}, send_message(Config, SentData)), - #{?snk_kind := pgsql_connector_query_return}, - 10_000 - ), + {Res, {ok, _}} = + ?wait_async_action( + send_message(Config, SentData), + #{?snk_kind := pgsql_connector_query_return}, + 10_000 + ), + case QueryMode of + async -> + ok; + sync -> + ?assertEqual({ok, 1}, Res) + end, ?assertMatch( Val, connect_and_get_payload(Config) @@ -457,28 +493,71 @@ t_write_timeout(Config) -> ProxyName = ?config(proxy_name, Config), ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config), - {ok, _} = create_bridge(Config), + QueryMode = ?config(query_mode, Config), + {ok, _} = create_bridge( + Config, + #{ + <<"resource_opts">> => #{ + <<"request_timeout">> => 500, + <<"resume_interval">> => 100, + <<"health_check_interval">> => 100 + } + } + ), Val = integer_to_binary(erlang:unique_integer()), SentData = #{payload => Val, timestamp => 1668602148000}, - Timeout = 1000, - emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() -> - ?assertMatch( - {error, {resource_error, #{reason := timeout}}}, - query_resource(Config, {send_message, SentData, [], Timeout}) - ) - end), + {ok, SRef} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := call_query_enter}), + 2_000 + ), + Res0 = + emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() -> + Res1 = + case QueryMode of + async -> + query_resource_async(Config, {send_message, SentData}); + sync -> + query_resource(Config, {send_message, SentData}) + end, + ?assertMatch({ok, [_]}, snabbkaffe:receive_events(SRef)), + Res1 + end), + case Res0 of + {_, Ref} when is_reference(Ref) -> + case receive_result(Ref, 15_000) of + {ok, Res} -> + ?assertMatch({error, {unrecoverable_error, _}}, Res); + timeout -> + ct:pal("mailbox:\n ~p", [process_info(self(), messages)]), + ct:fail("no response received") + end; + _ -> + ?assertMatch({error, {resource_error, #{reason := timeout}}}, Res0) + end, ok. t_simple_sql_query(Config) -> + EnableBatch = ?config(enable_batch, Config), + QueryMode = ?config(query_mode, Config), ?assertMatch( {ok, _}, create_bridge(Config) ), Request = {sql, <<"SELECT count(1) AS T">>}, - Result = query_resource(Config, Request), - case ?config(enable_batch, Config) of - true -> ?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result); - false -> ?assertMatch({ok, _, [{1}]}, Result) + Result = + case QueryMode of + sync -> + query_resource(Config, Request); + async -> + {_, Ref} = query_resource_async(Config, Request), + {ok, Res} = receive_result(Ref, 2_000), + Res + end, + case EnableBatch of + true -> + ?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result); + false -> + ?assertMatch({ok, _, [{1}]}, Result) end, ok. @@ -487,21 +566,40 @@ t_missing_data(Config) -> {ok, _}, create_bridge(Config) ), - Result = send_message(Config, #{}), + {_, {ok, Event}} = + ?wait_async_action( + send_message(Config, #{}), + #{?snk_kind := buffer_worker_flush_ack}, + 2_000 + ), ?assertMatch( - {error, {unrecoverable_error, {error, error, <<"23502">>, not_null_violation, _, _}}}, - Result + #{ + result := + {error, + {unrecoverable_error, {error, error, <<"23502">>, not_null_violation, _, _}}} + }, + Event ), ok. t_bad_sql_parameter(Config) -> + QueryMode = ?config(query_mode, Config), + EnableBatch = ?config(enable_batch, Config), ?assertMatch( {ok, _}, create_bridge(Config) ), Request = {sql, <<"">>, [bad_parameter]}, - Result = query_resource(Config, Request), - case ?config(enable_batch, Config) of + Result = + case QueryMode of + sync -> + query_resource(Config, Request); + async -> + {_, Ref} = query_resource_async(Config, Request), + {ok, Res} = receive_result(Ref, 2_000), + Res + end, + case EnableBatch of true -> ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result); false -> @@ -515,5 +613,10 @@ t_nasty_sql_string(Config) -> ?assertMatch({ok, _}, create_bridge(Config)), Payload = list_to_binary(lists:seq(1, 127)), Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)}, - ?assertEqual({ok, 1}, send_message(Config, Message)), + {_, {ok, _}} = + ?wait_async_action( + send_message(Config, Message), + #{?snk_kind := pgsql_connector_query_return}, + 1_000 + ), ?assertEqual(Payload, connect_and_get_payload(Config)). diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl index 5431cbb03..f0b70d21b 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl @@ -64,14 +64,17 @@ groups() -> {group, batch_on}, {group, batch_off} ], + QueryModeGroups = [{group, async}, {group, sync}], [ {rest, TCs}, {transports, [ {group, tcp}, {group, tls} ]}, - {tcp, TypeGroups}, - {tls, TypeGroups}, + {tcp, QueryModeGroups}, + {tls, QueryModeGroups}, + {async, TypeGroups}, + {sync, TypeGroups}, {redis_single, BatchGroups}, {redis_sentinel, BatchGroups}, {redis_cluster, BatchGroups}, @@ -79,6 +82,10 @@ groups() -> {batch_off, ResourceSpecificTCs} ]. +init_per_group(async, Config) -> + [{query_mode, async} | Config]; +init_per_group(sync, Config) -> + [{query_mode, sync} | Config]; init_per_group(Group, Config) when Group =:= redis_single; Group =:= redis_sentinel; Group =:= redis_cluster -> @@ -149,8 +156,9 @@ init_per_testcase(_Testcase, Config) -> {skip, "Batching is not supported by 'redis_cluster' bridge type"}; {RedisType, BatchMode} -> Transport = ?config(transport, Config), + QueryMode = ?config(query_mode, Config), #{RedisType := #{Transport := RedisConnConfig}} = redis_connect_configs(), - #{BatchMode := ResourceConfig} = resource_configs(), + #{BatchMode := ResourceConfig} = resource_configs(#{query_mode => QueryMode}), IsBatch = (BatchMode =:= batch_on), BridgeConfig0 = maps:merge(RedisConnConfig, ?COMMON_REDIS_OPTS), BridgeConfig1 = BridgeConfig0#{<<"resource_opts">> => ResourceConfig}, @@ -301,7 +309,7 @@ t_permanent_error(_Config) -> ?wait_async_action( publish_message(Topic, Payload), #{?snk_kind := redis_ee_connector_send_done}, - 10000 + 10_000 ) end, fun(Trace) -> @@ -529,14 +537,14 @@ invalid_command_bridge_config() -> <<"command_template">> => [<<"BAD">>, <<"COMMAND">>, <<"${payload}">>] }. -resource_configs() -> +resource_configs(#{query_mode := QueryMode}) -> #{ batch_off => #{ - <<"query_mode">> => <<"sync">>, + <<"query_mode">> => atom_to_binary(QueryMode), <<"start_timeout">> => <<"15s">> }, batch_on => #{ - <<"query_mode">> => <<"sync">>, + <<"query_mode">> => atom_to_binary(QueryMode), <<"worker_pool_size">> => <<"1">>, <<"batch_size">> => integer_to_binary(?BATCH_SIZE), <<"start_timeout">> => <<"15s">>, diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_rocketmq_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_rocketmq_SUITE.erl index cd02b65d0..95ec47e7f 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_rocketmq_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_rocketmq_SUITE.erl @@ -24,17 +24,24 @@ all() -> [ - {group, with_batch}, - {group, without_batch} + {group, async}, + {group, sync} ]. groups() -> TCs = emqx_common_test_helpers:all(?MODULE), + BatchingGroups = [{group, with_batch}, {group, without_batch}], [ + {async, BatchingGroups}, + {sync, BatchingGroups}, {with_batch, TCs}, {without_batch, TCs} ]. +init_per_group(async, Config) -> + [{query_mode, async} | Config]; +init_per_group(sync, Config) -> + [{query_mode, sync} | Config]; init_per_group(with_batch, Config0) -> Config = [{batch_size, ?BATCH_SIZE} | Config0], common_init(Config); @@ -84,7 +91,6 @@ common_init(ConfigT) -> Config0 = [ {host, Host}, {port, Port}, - {query_mode, sync}, {proxy_name, "rocketmq"} | ConfigT ], diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_tdengine_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_tdengine_SUITE.erl index 3b580ec61..c956a93c6 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_tdengine_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_tdengine_SUITE.erl @@ -46,18 +46,25 @@ all() -> [ - {group, with_batch}, - {group, without_batch} + {group, async}, + {group, sync} ]. groups() -> TCs = emqx_common_test_helpers:all(?MODULE), NonBatchCases = [t_write_timeout], + BatchingGroups = [{group, with_batch}, {group, without_batch}], [ + {async, BatchingGroups}, + {sync, BatchingGroups}, {with_batch, TCs -- NonBatchCases}, {without_batch, TCs} ]. +init_per_group(async, Config) -> + [{query_mode, async} | Config]; +init_per_group(sync, Config) -> + [{query_mode, sync} | Config]; init_per_group(with_batch, Config0) -> Config = [{enable_batch, true} | Config0], common_init(Config); @@ -87,6 +94,7 @@ end_per_suite(_Config) -> init_per_testcase(_Testcase, Config) -> connect_and_clear_table(Config), delete_bridge(Config), + snabbkaffe:start_trace(), Config. end_per_testcase(_Testcase, Config) -> @@ -109,7 +117,6 @@ common_init(ConfigT) -> Config0 = [ {td_host, Host}, {td_port, Port}, - {query_mode, sync}, {proxy_name, "tdengine_restful"} | ConfigT ], @@ -194,9 +201,13 @@ parse_and_check(ConfigString, BridgeType, Name) -> Config. create_bridge(Config) -> + create_bridge(Config, _Overrides = #{}). + +create_bridge(Config, Overrides) -> BridgeType = ?config(tdengine_bridge_type, Config), Name = ?config(tdengine_name, Config), - TDConfig = ?config(tdengine_config, Config), + TDConfig0 = ?config(tdengine_config, Config), + TDConfig = emqx_map_lib:deep_merge(TDConfig0, Overrides), emqx_bridge:create(BridgeType, Name, TDConfig). delete_bridge(Config) -> @@ -224,6 +235,27 @@ query_resource(Config, Request) -> ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), emqx_resource:query(ResourceID, Request, #{timeout => 1_000}). +query_resource_async(Config, Request) -> + Name = ?config(tdengine_name, Config), + BridgeType = ?config(tdengine_bridge_type, Config), + Ref = alias([reply]), + AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end, + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + Return = emqx_resource:query(ResourceID, Request, #{ + timeout => 500, async_reply_fun => {AsyncReplyFun, []} + }), + {Return, Ref}. + +receive_result(Ref, Timeout) -> + receive + {result, Ref, Result} -> + {ok, Result}; + {Ref, Result} -> + {ok, Result} + after Timeout -> + timeout + end. + connect_direct_tdengine(Config) -> Opts = [ {host, to_bin(?config(td_host, Config))}, @@ -273,12 +305,14 @@ t_setup_via_config_and_publish(Config) -> SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000}, ?check_trace( begin - ?wait_async_action( - ?assertMatch( - {ok, #{<<"code">> := 0, <<"rows">> := 1}}, send_message(Config, SentData) + {_, {ok, #{result := Result}}} = + ?wait_async_action( + send_message(Config, SentData), + #{?snk_kind := buffer_worker_flush_ack}, + 2_000 ), - #{?snk_kind := tdengine_connector_query_return}, - 10_000 + ?assertMatch( + {ok, #{<<"code">> := 0, <<"rows">> := 1}}, Result ), ?assertMatch( ?PAYLOAD, @@ -297,24 +331,32 @@ t_setup_via_config_and_publish(Config) -> t_setup_via_http_api_and_publish(Config) -> BridgeType = ?config(tdengine_bridge_type, Config), Name = ?config(tdengine_name, Config), - PgsqlConfig0 = ?config(tdengine_config, Config), - PgsqlConfig = PgsqlConfig0#{ + QueryMode = ?config(query_mode, Config), + TDengineConfig0 = ?config(tdengine_config, Config), + TDengineConfig = TDengineConfig0#{ <<"name">> => Name, <<"type">> => BridgeType }, ?assertMatch( {ok, _}, - create_bridge_http(PgsqlConfig) + create_bridge_http(TDengineConfig) ), SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000}, ?check_trace( begin - ?wait_async_action( - ?assertMatch( - {ok, #{<<"code">> := 0, <<"rows">> := 1}}, send_message(Config, SentData) - ), - #{?snk_kind := tdengine_connector_query_return}, - 10_000 + Request = {send_message, SentData}, + Res0 = + case QueryMode of + sync -> + query_resource(Config, Request); + async -> + {_, Ref} = query_resource_async(Config, Request), + {ok, Res} = receive_result(Ref, 2_000), + Res + end, + + ?assertMatch( + {ok, #{<<"code">> := 0, <<"rows">> := 1}}, Res0 ), ?assertMatch( ?PAYLOAD, @@ -359,7 +401,14 @@ t_write_failure(Config) -> {ok, _} = create_bridge(Config), SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000}, emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> - ?assertMatch({error, econnrefused}, send_message(Config, SentData)) + {_, {ok, #{result := Result}}} = + ?wait_async_action( + send_message(Config, SentData), + #{?snk_kind := buffer_worker_flush_ack}, + 2_000 + ), + ?assertMatch({error, econnrefused}, Result), + ok end), ok. @@ -369,24 +418,50 @@ t_write_timeout(Config) -> ProxyName = ?config(proxy_name, Config), ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config), - {ok, _} = create_bridge(Config), + QueryMode = ?config(query_mode, Config), + {ok, _} = create_bridge( + Config, + #{ + <<"resource_opts">> => #{ + <<"request_timeout">> => 500, + <<"resume_interval">> => 100, + <<"health_check_interval">> => 100 + } + } + ), SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000}, - emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() -> - ?assertMatch( - {error, {resource_error, #{reason := timeout}}}, - query_resource(Config, {send_message, SentData}) - ) - end), + %% FIXME: TDengine connector hangs indefinetily during + %% `call_query' while the connection is unresponsive. Should add + %% a timeout to `APPLY_RESOURCE' in buffer worker?? + case QueryMode of + sync -> + emqx_common_test_helpers:with_failure( + timeout, ProxyName, ProxyHost, ProxyPort, fun() -> + ?assertMatch( + {error, {resource_error, #{reason := timeout}}}, + query_resource(Config, {send_message, SentData}) + ) + end + ); + async -> + ct:comment("tdengine connector hangs the buffer worker forever") + end, ok. t_simple_sql_query(Config) -> + EnableBatch = ?config(enable_batch, Config), ?assertMatch( {ok, _}, create_bridge(Config) ), Request = {query, <<"SELECT count(1) AS T">>}, - Result = query_resource(Config, Request), - case ?config(enable_batch, Config) of + {_, {ok, #{result := Result}}} = + ?wait_async_action( + query_resource(Config, Request), + #{?snk_kind := buffer_worker_flush_ack}, + 2_000 + ), + case EnableBatch of true -> ?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result); false -> @@ -399,7 +474,12 @@ t_missing_data(Config) -> {ok, _}, create_bridge(Config) ), - Result = send_message(Config, #{}), + {_, {ok, #{result := Result}}} = + ?wait_async_action( + send_message(Config, #{}), + #{?snk_kind := buffer_worker_flush_ack}, + 2_000 + ), ?assertMatch( {error, #{ <<"code">> := 534, @@ -410,13 +490,19 @@ t_missing_data(Config) -> ok. t_bad_sql_parameter(Config) -> + EnableBatch = ?config(enable_batch, Config), ?assertMatch( {ok, _}, create_bridge(Config) ), Request = {sql, <<"">>, [bad_parameter]}, - Result = query_resource(Config, Request), - case ?config(enable_batch, Config) of + {_, {ok, #{result := Result}}} = + ?wait_async_action( + query_resource(Config, Request), + #{?snk_kind := buffer_worker_flush_ack}, + 2_000 + ), + case EnableBatch of true -> ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result); false -> @@ -443,9 +529,15 @@ t_nasty_sql_string(Config) -> % [1]: https://github.com/taosdata/TDengine/blob/066cb34a/source/libs/parser/src/parUtil.c#L279-L301 Payload = list_to_binary(lists:seq(1, 127)), Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)}, + {_, {ok, #{result := Result}}} = + ?wait_async_action( + send_message(Config, Message), + #{?snk_kind := buffer_worker_flush_ack}, + 2_000 + ), ?assertMatch( {ok, #{<<"code">> := 0, <<"rows">> := 1}}, - send_message(Config, Message) + Result ), ?assertEqual( Payload, diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl index 8df77fbe0..aa03863b0 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl @@ -60,7 +60,9 @@ on_query(InstanceId, {send_message, Message0}, State) -> collection => emqx_plugin_libs_rule:proc_tmpl(CollectionTemplate, Message0) }, Message = render_message(PayloadTemplate, Message0), - emqx_connector_mongo:on_query(InstanceId, {send_message, Message}, NewConnectorState); + Res = emqx_connector_mongo:on_query(InstanceId, {send_message, Message}, NewConnectorState), + ?tp(mongo_ee_connector_on_query_return, #{result => Res}), + Res; on_query(InstanceId, Request, _State = #{connector_state := ConnectorState}) -> emqx_connector_mongo:on_query(InstanceId, Request, ConnectorState). diff --git a/rel/i18n/emqx_resource_schema.hocon b/rel/i18n/emqx_resource_schema.hocon index 933aa009b..c73f8b1aa 100644 --- a/rel/i18n/emqx_resource_schema.hocon +++ b/rel/i18n/emqx_resource_schema.hocon @@ -100,17 +100,6 @@ For bridges only have ingress direction data flow, it can be set to 0 otherwise } } - query_mode_sync_only { - desc { - en: """Query mode. Only support 'sync'.""" - zh: """请求模式。目前只支持同步模式。""" - } - label { - en: """Query mode""" - zh: """请求模式""" - } - } - request_timeout { desc { en: """Starting from the moment when the request enters the buffer, if the request remains in the buffer for the specified time or is sent but does not receive a response or acknowledgement in time, the request is considered expired."""