Merge pull request #10306 from thalesmg/enable-async-buffer-workers-all-bridges-v50

feat(bridges): enable async query mode for all bridges with buffer workers
This commit is contained in:
Thales Macedo Garitezi 2023-04-04 17:10:46 -03:00 committed by GitHub
commit 5d5b7ea215
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 639 additions and 292 deletions

View File

@ -172,10 +172,15 @@ on_query(
%% not return result, next loop will try again
on_query(InstId, {TypeOrKey, SQLOrKey, Params, Timeout}, State);
{error, Reason} ->
LogMeta = #{connector => InstId, sql => SQLOrKey, state => State},
?SLOG(
?tp(
error,
LogMeta#{msg => "mysql_connector_do_prepare_failed", reason => Reason}
"mysql_connector_do_prepare_failed",
#{
connector => InstId,
sql => SQLOrKey,
state => State,
reason => Reason
}
),
{error, Reason}
end;
@ -417,12 +422,10 @@ on_sql_query(
),
do_sql_query(SQLFunc, Conn, SQLOrKey, Params, Timeout, LogMeta);
{error, disconnected} ->
?SLOG(
?tp(
error,
LogMeta#{
msg => "mysql_connector_do_sql_query_failed",
reason => worker_is_disconnected
}
"mysql_connector_do_sql_query_failed",
LogMeta#{reason => worker_is_disconnected}
),
{error, {recoverable_error, disconnected}}
end.

View File

@ -44,7 +44,8 @@
execute_batch/3
]).
-export([do_get_status/1]).
%% for ecpool workers usage
-export([do_get_status/1, prepare_sql_to_conn/2]).
-define(PGSQL_HOST_OPTIONS, #{
default_port => ?PGSQL_DEFAULT_PORT

View File

@ -30,18 +30,6 @@ namespace() -> "resource_schema".
roots() -> [].
fields("resource_opts_sync_only") ->
[
{resource_opts,
mk(
ref(?MODULE, "creation_opts_sync_only"),
resource_opts_meta()
)}
];
fields("creation_opts_sync_only") ->
Fields = fields("creation_opts"),
QueryMod = {query_mode, fun query_mode_sync_only/1},
lists:keyreplace(query_mode, 1, Fields, QueryMod);
fields("resource_opts") ->
[
{resource_opts,
@ -117,12 +105,6 @@ query_mode(default) -> async;
query_mode(required) -> false;
query_mode(_) -> undefined.
query_mode_sync_only(type) -> enum([sync]);
query_mode_sync_only(desc) -> ?DESC("query_mode_sync_only");
query_mode_sync_only(default) -> sync;
query_mode_sync_only(required) -> false;
query_mode_sync_only(_) -> undefined.
request_timeout(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]);
request_timeout(desc) -> ?DESC("request_timeout");
request_timeout(default) -> <<"15s">>;
@ -167,7 +149,4 @@ max_queue_bytes(default) -> ?DEFAULT_QUEUE_SIZE_RAW;
max_queue_bytes(required) -> false;
max_queue_bytes(_) -> undefined.
desc("creation_opts") ->
?DESC("creation_opts");
desc("creation_opts_sync_only") ->
?DESC("creation_opts").
desc("creation_opts") -> ?DESC("creation_opts").

View File

@ -687,10 +687,6 @@ t_jq(_) ->
got_timeout
end,
ConfigRootKey = emqx_rule_engine_schema:namespace(),
DefaultTimeOut = emqx_config:get([
ConfigRootKey,
jq_function_default_timeout
]),
?assertThrow(
{jq_exception, {timeout, _}},
apply_func(jq, [TOProgram, <<"-2">>])

View File

@ -0,0 +1,3 @@
Add support for `async` query mode for most bridges.
Before this change, some bridges (Cassandra, MongoDB, MySQL, Postgres, Redis, RocketMQ, TDengine) were only allowed to be created with a `sync` query mode.

View File

@ -86,21 +86,10 @@ fields("config") ->
mk(
binary(),
#{desc => ?DESC("local_topic"), default => undefined}
)},
{resource_opts,
mk(
ref(?MODULE, "creation_opts"),
#{
required => false,
default => #{},
desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
}
)}
] ++
] ++ emqx_resource_schema:fields("resource_opts") ++
(emqx_ee_connector_cassa:fields(config) --
emqx_connector_schema_lib:prepare_statement_fields());
fields("creation_opts") ->
emqx_resource_schema:fields("creation_opts_sync_only");
fields("post") ->
fields("post", cassandra);
fields("put") ->
@ -115,8 +104,6 @@ desc("config") ->
?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for Cassandra using `", string:to_upper(Method), "` method."];
desc("creation_opts" = Name) ->
emqx_resource_schema:desc(Name);
desc(_) ->
undefined.

View File

@ -38,7 +38,7 @@ fields("config") ->
{enable, mk(boolean(), #{desc => ?DESC("enable"), default => true})},
{collection, mk(binary(), #{desc => ?DESC("collection"), default => <<"mqtt">>})},
{payload_template, mk(binary(), #{required => false, desc => ?DESC("payload_template")})}
] ++ emqx_resource_schema:fields("resource_opts_sync_only");
] ++ emqx_resource_schema:fields("resource_opts");
fields(mongodb_rs) ->
emqx_connector_mongo:fields(rs) ++ fields("config");
fields(mongodb_sharded) ->

View File

@ -79,21 +79,10 @@ fields("config") ->
mk(
binary(),
#{desc => ?DESC("local_topic"), default => undefined}
)},
{resource_opts,
mk(
ref(?MODULE, "creation_opts"),
#{
required => false,
default => #{},
desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
}
)}
] ++
] ++ emqx_resource_schema:fields("resource_opts") ++
(emqx_connector_mysql:fields(config) --
emqx_connector_schema_lib:prepare_statement_fields());
fields("creation_opts") ->
emqx_resource_schema:fields("creation_opts_sync_only");
fields("post") ->
[type_field(), name_field() | fields("config")];
fields("put") ->
@ -105,8 +94,6 @@ desc("config") ->
?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for MySQL using `", string:to_upper(Method), "` method."];
desc("creation_opts" = Name) ->
emqx_resource_schema:desc(Name);
desc(_) ->
undefined.

View File

@ -81,21 +81,10 @@ fields("config") ->
mk(
binary(),
#{desc => ?DESC("local_topic"), default => undefined}
)},
{resource_opts,
mk(
ref(?MODULE, "creation_opts"),
#{
required => false,
default => #{},
desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
}
)}
] ++
] ++ emqx_resource_schema:fields("resource_opts") ++
(emqx_connector_pgsql:fields(config) --
emqx_connector_schema_lib:prepare_statement_fields());
fields("creation_opts") ->
emqx_resource_schema:fields("creation_opts_sync_only");
fields("post") ->
fields("post", pgsql);
fields("put") ->
@ -110,8 +99,6 @@ desc("config") ->
?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for PostgreSQL using `", string:to_upper(Method), "` method."];
desc("creation_opts" = Name) ->
emqx_resource_schema:desc(Name);
desc(_) ->
undefined.

View File

@ -180,10 +180,10 @@ resource_fields(Type) ->
resource_creation_fields("redis_cluster") ->
% TODO
% Cluster bridge is currently incompatible with batching.
Fields = emqx_resource_schema:fields("creation_opts_sync_only"),
Fields = emqx_resource_schema:fields("creation_opts"),
lists:foldl(fun proplists:delete/2, Fields, [batch_size, batch_time, enable_batch]);
resource_creation_fields(_) ->
emqx_resource_schema:fields("creation_opts_sync_only").
emqx_resource_schema:fields("creation_opts").
desc("config") ->
?DESC("desc_config");

View File

@ -80,21 +80,10 @@ fields("config") ->
mk(
binary(),
#{desc => ?DESC("local_topic"), required => false}
)},
{resource_opts,
mk(
ref(?MODULE, "creation_opts"),
#{
required => false,
default => #{<<"request_timeout">> => ?DEFFAULT_REQ_TIMEOUT},
desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
}
)}
] ++
] ++ emqx_resource_schema:fields("resource_opts") ++
(emqx_ee_connector_rocketmq:fields(config) --
emqx_connector_schema_lib:prepare_statement_fields());
fields("creation_opts") ->
emqx_resource_schema:fields("creation_opts_sync_only");
fields("post") ->
[type_field(), name_field() | fields("config")];
fields("put") ->
@ -106,8 +95,6 @@ desc("config") ->
?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for RocketMQ using `", string:to_upper(Method), "` method."];
desc("creation_opts" = Name) ->
emqx_resource_schema:desc(Name);
desc(_) ->
undefined.

View File

@ -80,19 +80,8 @@ fields("config") ->
mk(
binary(),
#{desc => ?DESC("local_topic"), default => undefined}
)},
{resource_opts,
mk(
ref(?MODULE, "creation_opts"),
#{
required => false,
default => #{},
desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
}
)}
] ++ emqx_ee_connector_tdengine:fields(config);
fields("creation_opts") ->
emqx_resource_schema:fields("creation_opts_sync_only");
] ++ emqx_resource_schema:fields("resource_opts") ++ emqx_ee_connector_tdengine:fields(config);
fields("post") ->
[type_field(), name_field() | fields("config")];
fields("put") ->
@ -104,8 +93,6 @@ desc("config") ->
?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for TDengine using `", string:to_upper(Method), "` method."];
desc("creation_opts" = Name) ->
emqx_resource_schema:desc(Name);
desc(_) ->
undefined.

View File

@ -73,15 +73,16 @@ all() ->
groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
NonBatchCases = [t_write_timeout],
QueryModeGroups = [{group, async}, {group, sync}],
BatchingGroups = [
%{group, with_batch},
{group, without_batch}
],
[
{tcp, [
%{group, with_batch},
{group, without_batch}
]},
{tls, [
%{group, with_batch},
{group, without_batch}
]},
{tcp, QueryModeGroups},
{tls, QueryModeGroups},
{async, BatchingGroups},
{sync, BatchingGroups},
{with_batch, TCs -- NonBatchCases},
{without_batch, TCs}
].
@ -93,7 +94,6 @@ init_per_group(tcp, Config) ->
{cassa_host, Host},
{cassa_port, Port},
{enable_tls, false},
{query_mode, sync},
{proxy_name, "cassa_tcp"}
| Config
];
@ -104,10 +104,13 @@ init_per_group(tls, Config) ->
{cassa_host, Host},
{cassa_port, Port},
{enable_tls, true},
{query_mode, sync},
{proxy_name, "cassa_tls"}
| Config
];
init_per_group(async, Config) ->
[{query_mode, async} | Config];
init_per_group(sync, Config) ->
[{query_mode, sync} | Config];
init_per_group(with_batch, Config0) ->
Config = [{enable_batch, true} | Config0],
common_init(Config);
@ -139,14 +142,15 @@ end_per_suite(_Config) ->
init_per_testcase(_Testcase, Config) ->
connect_and_clear_table(Config),
delete_bridge(Config),
snabbkaffe:start_trace(),
Config.
end_per_testcase(_Testcase, Config) ->
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
connect_and_clear_table(Config),
ok = snabbkaffe:stop(),
connect_and_clear_table(Config),
delete_bridge(Config),
ok.
@ -171,6 +175,7 @@ common_init(Config0) ->
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
emqx_mgmt_api_test_util:init_suite(),
% Connect to cassnadra directly and create the table
catch connect_and_drop_table(Config0),
connect_and_create_table(Config0),
{Name, CassaConf} = cassa_config(BridgeType, Config0),
Config =
@ -250,9 +255,13 @@ parse_and_check(ConfigString, BridgeType, Name) ->
Config.
create_bridge(Config) ->
create_bridge(Config, _Overrides = #{}).
create_bridge(Config, Overrides) ->
BridgeType = ?config(cassa_bridge_type, Config),
Name = ?config(cassa_name, Config),
BridgeConfig = ?config(cassa_config, Config),
BridgeConfig0 = ?config(cassa_config, Config),
BridgeConfig = emqx_map_lib:deep_merge(BridgeConfig0, Overrides),
emqx_bridge:create(BridgeType, Name, BridgeConfig).
delete_bridge(Config) ->
@ -288,6 +297,27 @@ query_resource(Config, Request) ->
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
query_resource_async(Config, Request) ->
Name = ?config(cassa_name, Config),
BridgeType = ?config(cassa_bridge_type, Config),
Ref = alias([reply]),
AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
Return = emqx_resource:query(ResourceID, Request, #{
timeout => 500, async_reply_fun => {AsyncReplyFun, []}
}),
{Return, Ref}.
receive_result(Ref, Timeout) when is_reference(Ref) ->
receive
{result, Ref, Result} ->
{ok, Result};
{Ref, Result} ->
{ok, Result}
after Timeout ->
timeout
end.
connect_direct_cassa(Config) ->
Opts = #{
nodes => [{?config(cassa_host, Config), ?config(cassa_port, Config)}],
@ -546,15 +576,27 @@ t_write_failure(Config) ->
% ok.
t_simple_sql_query(Config) ->
EnableBatch = ?config(enable_batch, Config),
QueryMode = ?config(query_mode, Config),
?assertMatch(
{ok, _},
create_bridge(Config)
),
Request = {query, <<"SELECT count(1) AS T FROM system.local">>},
Result = query_resource(Config, Request),
case ?config(enable_batch, Config) of
true -> ?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result);
false -> ?assertMatch({ok, {<<"system.local">>, _, [[1]]}}, Result)
Result =
case QueryMode of
sync ->
query_resource(Config, Request);
async ->
{_, Ref} = query_resource_async(Config, Request),
{ok, Res} = receive_result(Ref, 2_000),
Res
end,
case EnableBatch of
true ->
?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result);
false ->
?assertMatch({ok, {<<"system.local">>, _, [[1]]}}, Result)
end,
ok.
@ -565,22 +607,56 @@ t_missing_data(Config) ->
),
%% emqx_ee_connector_cassa will send missed data as a `null` atom
%% to ecql driver
Result = send_message(Config, #{}),
{_, {ok, Event}} =
?wait_async_action(
send_message(Config, #{}),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
?assertMatch(
%% TODO: match error msgs
{error, {unrecoverable_error, {8704, <<"Expected 8 or 0 byte long for date (4)">>}}},
Result
#{
result :=
{error, {unrecoverable_error, {8704, <<"Expected 8 or 0 byte long for date (4)">>}}}
},
Event
),
ok.
t_bad_sql_parameter(Config) ->
QueryMode = ?config(query_mode, Config),
EnableBatch = ?config(enable_batch, Config),
Name = ?config(cassa_name, Config),
ResourceId = emqx_bridge_resource:resource_id(cassandra, Name),
?assertMatch(
{ok, _},
create_bridge(Config)
create_bridge(
Config,
#{
<<"resource_opts">> => #{
<<"request_timeout">> => 500,
<<"resume_interval">> => 100,
<<"health_check_interval">> => 100
}
}
)
),
Request = {query, <<"">>, [bad_parameter]},
Result = query_resource(Config, Request),
case ?config(enable_batch, Config) of
Result =
case QueryMode of
sync ->
query_resource(Config, Request);
async ->
{_, Ref} = query_resource_async(Config, Request),
case receive_result(Ref, 5_000) of
{ok, Res} ->
Res;
timeout ->
ct:pal("mailbox:\n ~p", [process_info(self(), messages)]),
ct:fail("no response received")
end
end,
case EnableBatch of
true ->
?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
false ->

View File

@ -9,6 +9,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%%------------------------------------------------------------------------------
%% CT boilerplate
@ -16,9 +17,8 @@
all() ->
[
{group, rs},
{group, sharded},
{group, single}
{group, async},
{group, sync}
| (emqx_common_test_helpers:all(?MODULE) -- group_tests())
].
@ -31,12 +31,23 @@ group_tests() ->
].
groups() ->
TypeGroups = [
{group, rs},
{group, sharded},
{group, single}
],
[
{async, TypeGroups},
{sync, TypeGroups},
{rs, group_tests()},
{sharded, group_tests()},
{single, group_tests()}
].
init_per_group(async, Config) ->
[{query_mode, async} | Config];
init_per_group(sync, Config) ->
[{query_mode, sync} | Config];
init_per_group(Type = rs, Config) ->
MongoHost = os:getenv("MONGO_RS_HOST", "mongo1"),
MongoPort = list_to_integer(os:getenv("MONGO_RS_PORT", "27017")),
@ -44,7 +55,7 @@ init_per_group(Type = rs, Config) ->
true ->
ok = start_apps(),
emqx_mgmt_api_test_util:init_suite(),
{Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type),
{Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type, Config),
[
{mongo_host, MongoHost},
{mongo_port, MongoPort},
@ -63,7 +74,7 @@ init_per_group(Type = sharded, Config) ->
true ->
ok = start_apps(),
emqx_mgmt_api_test_util:init_suite(),
{Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type),
{Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type, Config),
[
{mongo_host, MongoHost},
{mongo_port, MongoPort},
@ -82,7 +93,7 @@ init_per_group(Type = single, Config) ->
true ->
ok = start_apps(),
emqx_mgmt_api_test_util:init_suite(),
{Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type),
{Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type, Config),
[
{mongo_host, MongoHost},
{mongo_port, MongoPort},
@ -99,6 +110,7 @@ end_per_group(_Type, _Config) ->
ok.
init_per_suite(Config) ->
emqx_common_test_helpers:clear_screen(),
Config.
end_per_suite(_Config) ->
@ -109,11 +121,13 @@ end_per_suite(_Config) ->
init_per_testcase(_Testcase, Config) ->
catch clear_db(Config),
delete_bridge(Config),
snabbkaffe:start_trace(),
Config.
end_per_testcase(_Testcase, Config) ->
catch clear_db(Config),
delete_bridge(Config),
snabbkaffe:stop(),
ok.
%%------------------------------------------------------------------------------
@ -140,7 +154,8 @@ mongo_type_bin(sharded) ->
mongo_type_bin(single) ->
<<"mongodb_single">>.
mongo_config(MongoHost, MongoPort0, rs = Type) ->
mongo_config(MongoHost, MongoPort0, rs = Type, Config) ->
QueryMode = ?config(query_mode, Config),
MongoPort = integer_to_list(MongoPort0),
Servers = MongoHost ++ ":" ++ MongoPort,
Name = atom_to_binary(?MODULE),
@ -154,13 +169,19 @@ mongo_config(MongoHost, MongoPort0, rs = Type) ->
" w_mode = safe\n"
" database = mqtt\n"
" resource_opts = {\n"
" query_mode = ~s\n"
" worker_pool_size = 1\n"
" }\n"
"}",
[Name, Servers]
[
Name,
Servers,
QueryMode
]
),
{Name, parse_and_check(ConfigString, Type, Name)};
mongo_config(MongoHost, MongoPort0, sharded = Type) ->
mongo_config(MongoHost, MongoPort0, sharded = Type, Config) ->
QueryMode = ?config(query_mode, Config),
MongoPort = integer_to_list(MongoPort0),
Servers = MongoHost ++ ":" ++ MongoPort,
Name = atom_to_binary(?MODULE),
@ -173,13 +194,19 @@ mongo_config(MongoHost, MongoPort0, sharded = Type) ->
" w_mode = safe\n"
" database = mqtt\n"
" resource_opts = {\n"
" query_mode = ~s\n"
" worker_pool_size = 1\n"
" }\n"
"}",
[Name, Servers]
[
Name,
Servers,
QueryMode
]
),
{Name, parse_and_check(ConfigString, Type, Name)};
mongo_config(MongoHost, MongoPort0, single = Type) ->
mongo_config(MongoHost, MongoPort0, single = Type, Config) ->
QueryMode = ?config(query_mode, Config),
MongoPort = integer_to_list(MongoPort0),
Server = MongoHost ++ ":" ++ MongoPort,
Name = atom_to_binary(?MODULE),
@ -192,10 +219,15 @@ mongo_config(MongoHost, MongoPort0, single = Type) ->
" w_mode = safe\n"
" database = mqtt\n"
" resource_opts = {\n"
" query_mode = ~s\n"
" worker_pool_size = 1\n"
" }\n"
"}",
[Name, Server]
[
Name,
Server,
QueryMode
]
),
{Name, parse_and_check(ConfigString, Type, Name)}.
@ -248,7 +280,7 @@ find_all(Config) ->
Name = ?config(mongo_name, Config),
#{<<"collection">> := Collection} = ?config(mongo_config, Config),
ResourceID = emqx_bridge_resource:resource_id(Type, Name),
emqx_resource:query(ResourceID, {find, Collection, #{}, #{}}).
emqx_resource:simple_sync_query(ResourceID, {find, Collection, #{}, #{}}).
send_message(Config, Payload) ->
Name = ?config(mongo_name, Config),
@ -266,7 +298,12 @@ t_setup_via_config_and_publish(Config) ->
create_bridge(Config)
),
Val = erlang:unique_integer(),
ok = send_message(Config, #{key => Val}),
{ok, {ok, _}} =
?wait_async_action(
send_message(Config, #{key => Val}),
#{?snk_kind := mongo_ee_connector_on_query_return},
5_000
),
?assertMatch(
{ok, [#{<<"key">> := Val}]},
find_all(Config)
@ -286,7 +323,12 @@ t_setup_via_http_api_and_publish(Config) ->
create_bridge_http(MongoConfig)
),
Val = erlang:unique_integer(),
ok = send_message(Config, #{key => Val}),
{ok, {ok, _}} =
?wait_async_action(
send_message(Config, #{key => Val}),
#{?snk_kind := mongo_ee_connector_on_query_return},
5_000
),
?assertMatch(
{ok, [#{<<"key">> := Val}]},
find_all(Config)
@ -297,7 +339,12 @@ t_payload_template(Config) ->
{ok, _} = create_bridge(Config, #{<<"payload_template">> => <<"{\"foo\": \"${clientid}\"}">>}),
Val = erlang:unique_integer(),
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
ok = send_message(Config, #{key => Val, clientid => ClientId}),
{ok, {ok, _}} =
?wait_async_action(
send_message(Config, #{key => Val, clientid => ClientId}),
#{?snk_kind := mongo_ee_connector_on_query_return},
5_000
),
?assertMatch(
{ok, [#{<<"foo">> := ClientId}]},
find_all(Config)
@ -314,11 +361,16 @@ t_collection_template(Config) ->
),
Val = erlang:unique_integer(),
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
ok = send_message(Config, #{
key => Val,
clientid => ClientId,
mycollectionvar => <<"mycol">>
}),
{ok, {ok, _}} =
?wait_async_action(
send_message(Config, #{
key => Val,
clientid => ClientId,
mycollectionvar => <<"mycol">>
}),
#{?snk_kind := mongo_ee_connector_on_query_return},
5_000
),
?assertMatch(
{ok, [#{<<"foo">> := ClientId}]},
find_all(Config)

View File

@ -45,15 +45,16 @@ all() ->
groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
NonBatchCases = [t_write_timeout, t_uninitialized_prepared_statement],
BatchingGroups = [
{group, with_batch},
{group, without_batch}
],
QueryModeGroups = [{group, async}, {group, sync}],
[
{tcp, [
{group, with_batch},
{group, without_batch}
]},
{tls, [
{group, with_batch},
{group, without_batch}
]},
{tcp, QueryModeGroups},
{tls, QueryModeGroups},
{async, BatchingGroups},
{sync, BatchingGroups},
{with_batch, TCs -- NonBatchCases},
{without_batch, TCs}
].
@ -65,7 +66,6 @@ init_per_group(tcp, Config) ->
{mysql_host, MysqlHost},
{mysql_port, MysqlPort},
{enable_tls, false},
{query_mode, sync},
{proxy_name, "mysql_tcp"}
| Config
];
@ -76,10 +76,13 @@ init_per_group(tls, Config) ->
{mysql_host, MysqlHost},
{mysql_port, MysqlPort},
{enable_tls, true},
{query_mode, sync},
{proxy_name, "mysql_tls"}
| Config
];
init_per_group(async, Config) ->
[{query_mode, async} | Config];
init_per_group(sync, Config) ->
[{query_mode, sync} | Config];
init_per_group(with_batch, Config0) ->
Config = [{batch_size, 100} | Config0],
common_init(Config);
@ -99,6 +102,7 @@ end_per_group(_Group, _Config) ->
ok.
init_per_suite(Config) ->
emqx_common_test_helpers:clear_screen(),
Config.
end_per_suite(_Config) ->
@ -109,6 +113,7 @@ end_per_suite(_Config) ->
init_per_testcase(_Testcase, Config) ->
connect_and_clear_table(Config),
delete_bridge(Config),
snabbkaffe:start_trace(),
Config.
end_per_testcase(_Testcase, Config) ->
@ -237,6 +242,25 @@ query_resource(Config, Request) ->
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
emqx_resource:query(ResourceID, Request, #{timeout => 500}).
query_resource_async(Config, Request) ->
Name = ?config(mysql_name, Config),
BridgeType = ?config(mysql_bridge_type, Config),
Ref = alias([reply]),
AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
Return = emqx_resource:query(ResourceID, Request, #{
timeout => 500, async_reply_fun => {AsyncReplyFun, []}
}),
{Return, Ref}.
receive_result(Ref, Timeout) ->
receive
{result, Ref, Result} ->
{ok, Result}
after Timeout ->
timeout
end.
unprepare(Config, Key) ->
Name = ?config(mysql_name, Config),
BridgeType = ?config(mysql_bridge_type, Config),
@ -409,17 +433,29 @@ t_write_failure(Config) ->
Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000},
?check_trace(
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
case QueryMode of
sync ->
?assertMatch(
{error, {resource_error, #{reason := timeout}}},
begin
%% for some unknown reason, `?wait_async_action' and `subscribe'
%% hang and timeout if called inside `with_failure', but the event
%% happens and is emitted after the test pid dies!?
{ok, SRef} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := buffer_worker_flush_nack}),
2_000
),
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
case QueryMode of
sync ->
?assertMatch(
{error, {resource_error, #{reason := timeout}}},
send_message(Config, SentData)
);
async ->
send_message(Config, SentData)
);
async ->
send_message(Config, SentData)
end
end),
end,
?assertMatch({ok, [#{result := {error, _}}]}, snabbkaffe:receive_events(SRef)),
ok
end),
ok
end,
fun(Trace0) ->
ct:pal("trace: ~p", [Trace0]),
Trace = ?of_kind(buffer_worker_flush_nack, Trace0),
@ -443,27 +479,52 @@ t_write_timeout(Config) ->
ProxyName = ?config(proxy_name, Config),
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
QueryMode = ?config(query_mode, Config),
{ok, _} = create_bridge(Config),
Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000},
Timeout = 1000,
%% for some unknown reason, `?wait_async_action' and `subscribe'
%% hang and timeout if called inside `with_failure', but the event
%% happens and is emitted after the test pid dies!?
{ok, SRef} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := buffer_worker_flush_nack}),
2 * Timeout
),
emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
?assertMatch(
{error, {resource_error, #{reason := timeout}}},
query_resource(Config, {send_message, SentData, [], Timeout})
)
case QueryMode of
sync ->
?assertMatch(
{error, {resource_error, #{reason := timeout}}},
query_resource(Config, {send_message, SentData, [], Timeout})
);
async ->
query_resource(Config, {send_message, SentData, [], Timeout}),
ok
end,
ok
end),
?assertMatch({ok, [#{result := {error, _}}]}, snabbkaffe:receive_events(SRef)),
ok.
t_simple_sql_query(Config) ->
QueryMode = ?config(query_mode, Config),
BatchSize = ?config(batch_size, Config),
IsBatch = BatchSize > 1,
?assertMatch(
{ok, _},
create_bridge(Config)
),
Request = {sql, <<"SELECT count(1) AS T">>},
Result = query_resource(Config, Request),
BatchSize = ?config(batch_size, Config),
IsBatch = BatchSize > 1,
Result =
case QueryMode of
sync ->
query_resource(Config, Request);
async ->
{_, Ref} = query_resource_async(Config, Request),
{ok, Res} = receive_result(Ref, 2_000),
Res
end,
case IsBatch of
true -> ?assertEqual({error, {unrecoverable_error, batch_select_not_implemented}}, Result);
false -> ?assertEqual({ok, [<<"T">>], [[1]]}, Result)
@ -471,25 +532,37 @@ t_simple_sql_query(Config) ->
ok.
t_missing_data(Config) ->
BatchSize = ?config(batch_size, Config),
IsBatch = BatchSize > 1,
?assertMatch(
{ok, _},
create_bridge(Config)
),
Result = send_message(Config, #{}),
BatchSize = ?config(batch_size, Config),
IsBatch = BatchSize > 1,
{ok, SRef} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := buffer_worker_flush_ack}),
2_000
),
send_message(Config, #{}),
{ok, [Event]} = snabbkaffe:receive_events(SRef),
case IsBatch of
true ->
?assertMatch(
{error,
{unrecoverable_error,
{1292, _, <<"Truncated incorrect DOUBLE value: 'undefined'">>}}},
Result
#{
result :=
{error,
{unrecoverable_error,
{1292, _, <<"Truncated incorrect DOUBLE value: 'undefined'">>}}}
},
Event
);
false ->
?assertMatch(
{error, {unrecoverable_error, {1048, _, <<"Column 'arrived' cannot be null">>}}},
Result
#{
result :=
{error,
{unrecoverable_error, {1048, _, <<"Column 'arrived' cannot be null">>}}}
},
Event
)
end,
ok.
@ -500,14 +573,22 @@ t_bad_sql_parameter(Config) ->
create_bridge(Config)
),
Request = {sql, <<"">>, [bad_parameter]},
Result = query_resource(Config, Request),
{_, {ok, Event}} =
?wait_async_action(
query_resource(Config, Request),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
BatchSize = ?config(batch_size, Config),
IsBatch = BatchSize > 1,
case IsBatch of
true ->
?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
?assertMatch(#{result := {error, {unrecoverable_error, invalid_request}}}, Event);
false ->
?assertEqual({error, {unrecoverable_error, {invalid_params, [bad_parameter]}}}, Result)
?assertMatch(
#{result := {error, {unrecoverable_error, {invalid_params, [bad_parameter]}}}},
Event
)
end,
ok.
@ -515,7 +596,12 @@ t_nasty_sql_string(Config) ->
?assertMatch({ok, _}, create_bridge(Config)),
Payload = list_to_binary(lists:seq(0, 255)),
Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)},
Result = send_message(Config, Message),
{Result, {ok, _}} =
?wait_async_action(
send_message(Config, Message),
#{?snk_kind := mysql_connector_query_return},
1_000
),
?assertEqual(ok, Result),
?assertMatch(
{ok, [<<"payload">>], [[Payload]]},
@ -561,12 +647,22 @@ t_unprepared_statement_query(Config) ->
create_bridge(Config)
),
Request = {prepared_query, unprepared_query, []},
Result = query_resource(Config, Request),
{_, {ok, Event}} =
?wait_async_action(
query_resource(Config, Request),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
BatchSize = ?config(batch_size, Config),
IsBatch = BatchSize > 1,
case IsBatch of
true -> ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
false -> ?assertEqual({error, {unrecoverable_error, prepared_statement_invalid}}, Result)
true ->
?assertMatch(#{result := {error, {unrecoverable_error, invalid_request}}}, Event);
false ->
?assertMatch(
#{result := {error, {unrecoverable_error, prepared_statement_invalid}}},
Event
)
end,
ok.
@ -582,7 +678,13 @@ t_uninitialized_prepared_statement(Config) ->
unprepare(Config, send_message),
?check_trace(
begin
?assertEqual(ok, send_message(Config, SentData)),
{Res, {ok, _}} =
?wait_async_action(
send_message(Config, SentData),
#{?snk_kind := mysql_connector_query_return},
2_000
),
?assertEqual(ok, Res),
ok
end,
fun(Trace) ->

View File

@ -42,19 +42,18 @@ all() ->
groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
NonBatchCases = [t_write_timeout],
BatchVariantGroups = [
{group, with_batch},
{group, without_batch},
{group, matrix},
{group, timescale}
],
QueryModeGroups = [{async, BatchVariantGroups}, {sync, BatchVariantGroups}],
[
{tcp, [
{group, with_batch},
{group, without_batch},
{group, matrix},
{group, timescale}
]},
{tls, [
{group, with_batch},
{group, without_batch},
{group, matrix},
{group, timescale}
]},
{tcp, QueryModeGroups},
{tls, QueryModeGroups},
{async, BatchVariantGroups},
{sync, BatchVariantGroups},
{with_batch, TCs -- NonBatchCases},
{without_batch, TCs},
{matrix, [t_setup_via_config_and_publish, t_setup_via_http_api_and_publish]},
@ -68,7 +67,6 @@ init_per_group(tcp, Config) ->
{pgsql_host, Host},
{pgsql_port, Port},
{enable_tls, false},
{query_mode, sync},
{proxy_name, "pgsql_tcp"}
| Config
];
@ -79,10 +77,13 @@ init_per_group(tls, Config) ->
{pgsql_host, Host},
{pgsql_port, Port},
{enable_tls, true},
{query_mode, sync},
{proxy_name, "pgsql_tls"}
| Config
];
init_per_group(async, Config) ->
[{query_mode, async} | Config];
init_per_group(sync, Config) ->
[{query_mode, sync} | Config];
init_per_group(with_batch, Config0) ->
Config = [{enable_batch, true} | Config0],
common_init(Config);
@ -118,6 +119,7 @@ end_per_suite(_Config) ->
init_per_testcase(_Testcase, Config) ->
connect_and_clear_table(Config),
delete_bridge(Config),
snabbkaffe:start_trace(),
Config.
end_per_testcase(_Testcase, Config) ->
@ -221,9 +223,13 @@ parse_and_check(ConfigString, BridgeType, Name) ->
Config.
create_bridge(Config) ->
create_bridge(Config, _Overrides = #{}).
create_bridge(Config, Overrides) ->
BridgeType = ?config(pgsql_bridge_type, Config),
Name = ?config(pgsql_name, Config),
PGConfig = ?config(pgsql_config, Config),
PGConfig0 = ?config(pgsql_config, Config),
PGConfig = emqx_map_lib:deep_merge(PGConfig0, Overrides),
emqx_bridge:create(BridgeType, Name, PGConfig).
delete_bridge(Config) ->
@ -251,6 +257,27 @@ query_resource(Config, Request) ->
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
query_resource_async(Config, Request) ->
Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config),
Ref = alias([reply]),
AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
Return = emqx_resource:query(ResourceID, Request, #{
timeout => 500, async_reply_fun => {AsyncReplyFun, []}
}),
{Return, Ref}.
receive_result(Ref, Timeout) ->
receive
{result, Ref, Result} ->
{ok, Result};
{Ref, Result} ->
{ok, Result}
after Timeout ->
timeout
end.
connect_direct_pgsql(Config) ->
Opts = #{
host => ?config(pgsql_host, Config),
@ -308,11 +335,12 @@ t_setup_via_config_and_publish(Config) ->
SentData = #{payload => Val, timestamp => 1668602148000},
?check_trace(
begin
?wait_async_action(
?assertEqual({ok, 1}, send_message(Config, SentData)),
#{?snk_kind := pgsql_connector_query_return},
10_000
),
{_, {ok, _}} =
?wait_async_action(
send_message(Config, SentData),
#{?snk_kind := pgsql_connector_query_return},
10_000
),
?assertMatch(
Val,
connect_and_get_payload(Config)
@ -336,6 +364,7 @@ t_setup_via_http_api_and_publish(Config) ->
BridgeType = ?config(pgsql_bridge_type, Config),
Name = ?config(pgsql_name, Config),
PgsqlConfig0 = ?config(pgsql_config, Config),
QueryMode = ?config(query_mode, Config),
PgsqlConfig = PgsqlConfig0#{
<<"name">> => Name,
<<"type">> => BridgeType
@ -348,11 +377,18 @@ t_setup_via_http_api_and_publish(Config) ->
SentData = #{payload => Val, timestamp => 1668602148000},
?check_trace(
begin
?wait_async_action(
?assertEqual({ok, 1}, send_message(Config, SentData)),
#{?snk_kind := pgsql_connector_query_return},
10_000
),
{Res, {ok, _}} =
?wait_async_action(
send_message(Config, SentData),
#{?snk_kind := pgsql_connector_query_return},
10_000
),
case QueryMode of
async ->
ok;
sync ->
?assertEqual({ok, 1}, Res)
end,
?assertMatch(
Val,
connect_and_get_payload(Config)
@ -457,28 +493,71 @@ t_write_timeout(Config) ->
ProxyName = ?config(proxy_name, Config),
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
{ok, _} = create_bridge(Config),
QueryMode = ?config(query_mode, Config),
{ok, _} = create_bridge(
Config,
#{
<<"resource_opts">> => #{
<<"request_timeout">> => 500,
<<"resume_interval">> => 100,
<<"health_check_interval">> => 100
}
}
),
Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000},
Timeout = 1000,
emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
?assertMatch(
{error, {resource_error, #{reason := timeout}}},
query_resource(Config, {send_message, SentData, [], Timeout})
)
end),
{ok, SRef} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := call_query_enter}),
2_000
),
Res0 =
emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
Res1 =
case QueryMode of
async ->
query_resource_async(Config, {send_message, SentData});
sync ->
query_resource(Config, {send_message, SentData})
end,
?assertMatch({ok, [_]}, snabbkaffe:receive_events(SRef)),
Res1
end),
case Res0 of
{_, Ref} when is_reference(Ref) ->
case receive_result(Ref, 15_000) of
{ok, Res} ->
?assertMatch({error, {unrecoverable_error, _}}, Res);
timeout ->
ct:pal("mailbox:\n ~p", [process_info(self(), messages)]),
ct:fail("no response received")
end;
_ ->
?assertMatch({error, {resource_error, #{reason := timeout}}}, Res0)
end,
ok.
t_simple_sql_query(Config) ->
EnableBatch = ?config(enable_batch, Config),
QueryMode = ?config(query_mode, Config),
?assertMatch(
{ok, _},
create_bridge(Config)
),
Request = {sql, <<"SELECT count(1) AS T">>},
Result = query_resource(Config, Request),
case ?config(enable_batch, Config) of
true -> ?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result);
false -> ?assertMatch({ok, _, [{1}]}, Result)
Result =
case QueryMode of
sync ->
query_resource(Config, Request);
async ->
{_, Ref} = query_resource_async(Config, Request),
{ok, Res} = receive_result(Ref, 2_000),
Res
end,
case EnableBatch of
true ->
?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result);
false ->
?assertMatch({ok, _, [{1}]}, Result)
end,
ok.
@ -487,21 +566,40 @@ t_missing_data(Config) ->
{ok, _},
create_bridge(Config)
),
Result = send_message(Config, #{}),
{_, {ok, Event}} =
?wait_async_action(
send_message(Config, #{}),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
?assertMatch(
{error, {unrecoverable_error, {error, error, <<"23502">>, not_null_violation, _, _}}},
Result
#{
result :=
{error,
{unrecoverable_error, {error, error, <<"23502">>, not_null_violation, _, _}}}
},
Event
),
ok.
t_bad_sql_parameter(Config) ->
QueryMode = ?config(query_mode, Config),
EnableBatch = ?config(enable_batch, Config),
?assertMatch(
{ok, _},
create_bridge(Config)
),
Request = {sql, <<"">>, [bad_parameter]},
Result = query_resource(Config, Request),
case ?config(enable_batch, Config) of
Result =
case QueryMode of
sync ->
query_resource(Config, Request);
async ->
{_, Ref} = query_resource_async(Config, Request),
{ok, Res} = receive_result(Ref, 2_000),
Res
end,
case EnableBatch of
true ->
?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
false ->
@ -515,5 +613,10 @@ t_nasty_sql_string(Config) ->
?assertMatch({ok, _}, create_bridge(Config)),
Payload = list_to_binary(lists:seq(1, 127)),
Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)},
?assertEqual({ok, 1}, send_message(Config, Message)),
{_, {ok, _}} =
?wait_async_action(
send_message(Config, Message),
#{?snk_kind := pgsql_connector_query_return},
1_000
),
?assertEqual(Payload, connect_and_get_payload(Config)).

View File

@ -64,14 +64,17 @@ groups() ->
{group, batch_on},
{group, batch_off}
],
QueryModeGroups = [{group, async}, {group, sync}],
[
{rest, TCs},
{transports, [
{group, tcp},
{group, tls}
]},
{tcp, TypeGroups},
{tls, TypeGroups},
{tcp, QueryModeGroups},
{tls, QueryModeGroups},
{async, TypeGroups},
{sync, TypeGroups},
{redis_single, BatchGroups},
{redis_sentinel, BatchGroups},
{redis_cluster, BatchGroups},
@ -79,6 +82,10 @@ groups() ->
{batch_off, ResourceSpecificTCs}
].
init_per_group(async, Config) ->
[{query_mode, async} | Config];
init_per_group(sync, Config) ->
[{query_mode, sync} | Config];
init_per_group(Group, Config) when
Group =:= redis_single; Group =:= redis_sentinel; Group =:= redis_cluster
->
@ -149,8 +156,9 @@ init_per_testcase(_Testcase, Config) ->
{skip, "Batching is not supported by 'redis_cluster' bridge type"};
{RedisType, BatchMode} ->
Transport = ?config(transport, Config),
QueryMode = ?config(query_mode, Config),
#{RedisType := #{Transport := RedisConnConfig}} = redis_connect_configs(),
#{BatchMode := ResourceConfig} = resource_configs(),
#{BatchMode := ResourceConfig} = resource_configs(#{query_mode => QueryMode}),
IsBatch = (BatchMode =:= batch_on),
BridgeConfig0 = maps:merge(RedisConnConfig, ?COMMON_REDIS_OPTS),
BridgeConfig1 = BridgeConfig0#{<<"resource_opts">> => ResourceConfig},
@ -301,7 +309,7 @@ t_permanent_error(_Config) ->
?wait_async_action(
publish_message(Topic, Payload),
#{?snk_kind := redis_ee_connector_send_done},
10000
10_000
)
end,
fun(Trace) ->
@ -529,14 +537,14 @@ invalid_command_bridge_config() ->
<<"command_template">> => [<<"BAD">>, <<"COMMAND">>, <<"${payload}">>]
}.
resource_configs() ->
resource_configs(#{query_mode := QueryMode}) ->
#{
batch_off => #{
<<"query_mode">> => <<"sync">>,
<<"query_mode">> => atom_to_binary(QueryMode),
<<"start_timeout">> => <<"15s">>
},
batch_on => #{
<<"query_mode">> => <<"sync">>,
<<"query_mode">> => atom_to_binary(QueryMode),
<<"worker_pool_size">> => <<"1">>,
<<"batch_size">> => integer_to_binary(?BATCH_SIZE),
<<"start_timeout">> => <<"15s">>,

View File

@ -24,17 +24,24 @@
all() ->
[
{group, with_batch},
{group, without_batch}
{group, async},
{group, sync}
].
groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
BatchingGroups = [{group, with_batch}, {group, without_batch}],
[
{async, BatchingGroups},
{sync, BatchingGroups},
{with_batch, TCs},
{without_batch, TCs}
].
init_per_group(async, Config) ->
[{query_mode, async} | Config];
init_per_group(sync, Config) ->
[{query_mode, sync} | Config];
init_per_group(with_batch, Config0) ->
Config = [{batch_size, ?BATCH_SIZE} | Config0],
common_init(Config);
@ -84,7 +91,6 @@ common_init(ConfigT) ->
Config0 = [
{host, Host},
{port, Port},
{query_mode, sync},
{proxy_name, "rocketmq"}
| ConfigT
],

View File

@ -46,18 +46,25 @@
all() ->
[
{group, with_batch},
{group, without_batch}
{group, async},
{group, sync}
].
groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
NonBatchCases = [t_write_timeout],
BatchingGroups = [{group, with_batch}, {group, without_batch}],
[
{async, BatchingGroups},
{sync, BatchingGroups},
{with_batch, TCs -- NonBatchCases},
{without_batch, TCs}
].
init_per_group(async, Config) ->
[{query_mode, async} | Config];
init_per_group(sync, Config) ->
[{query_mode, sync} | Config];
init_per_group(with_batch, Config0) ->
Config = [{enable_batch, true} | Config0],
common_init(Config);
@ -87,6 +94,7 @@ end_per_suite(_Config) ->
init_per_testcase(_Testcase, Config) ->
connect_and_clear_table(Config),
delete_bridge(Config),
snabbkaffe:start_trace(),
Config.
end_per_testcase(_Testcase, Config) ->
@ -109,7 +117,6 @@ common_init(ConfigT) ->
Config0 = [
{td_host, Host},
{td_port, Port},
{query_mode, sync},
{proxy_name, "tdengine_restful"}
| ConfigT
],
@ -194,9 +201,13 @@ parse_and_check(ConfigString, BridgeType, Name) ->
Config.
create_bridge(Config) ->
create_bridge(Config, _Overrides = #{}).
create_bridge(Config, Overrides) ->
BridgeType = ?config(tdengine_bridge_type, Config),
Name = ?config(tdengine_name, Config),
TDConfig = ?config(tdengine_config, Config),
TDConfig0 = ?config(tdengine_config, Config),
TDConfig = emqx_map_lib:deep_merge(TDConfig0, Overrides),
emqx_bridge:create(BridgeType, Name, TDConfig).
delete_bridge(Config) ->
@ -224,6 +235,27 @@ query_resource(Config, Request) ->
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
query_resource_async(Config, Request) ->
Name = ?config(tdengine_name, Config),
BridgeType = ?config(tdengine_bridge_type, Config),
Ref = alias([reply]),
AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
Return = emqx_resource:query(ResourceID, Request, #{
timeout => 500, async_reply_fun => {AsyncReplyFun, []}
}),
{Return, Ref}.
receive_result(Ref, Timeout) ->
receive
{result, Ref, Result} ->
{ok, Result};
{Ref, Result} ->
{ok, Result}
after Timeout ->
timeout
end.
connect_direct_tdengine(Config) ->
Opts = [
{host, to_bin(?config(td_host, Config))},
@ -273,12 +305,14 @@ t_setup_via_config_and_publish(Config) ->
SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000},
?check_trace(
begin
?wait_async_action(
?assertMatch(
{ok, #{<<"code">> := 0, <<"rows">> := 1}}, send_message(Config, SentData)
{_, {ok, #{result := Result}}} =
?wait_async_action(
send_message(Config, SentData),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
#{?snk_kind := tdengine_connector_query_return},
10_000
?assertMatch(
{ok, #{<<"code">> := 0, <<"rows">> := 1}}, Result
),
?assertMatch(
?PAYLOAD,
@ -297,24 +331,32 @@ t_setup_via_config_and_publish(Config) ->
t_setup_via_http_api_and_publish(Config) ->
BridgeType = ?config(tdengine_bridge_type, Config),
Name = ?config(tdengine_name, Config),
PgsqlConfig0 = ?config(tdengine_config, Config),
PgsqlConfig = PgsqlConfig0#{
QueryMode = ?config(query_mode, Config),
TDengineConfig0 = ?config(tdengine_config, Config),
TDengineConfig = TDengineConfig0#{
<<"name">> => Name,
<<"type">> => BridgeType
},
?assertMatch(
{ok, _},
create_bridge_http(PgsqlConfig)
create_bridge_http(TDengineConfig)
),
SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000},
?check_trace(
begin
?wait_async_action(
?assertMatch(
{ok, #{<<"code">> := 0, <<"rows">> := 1}}, send_message(Config, SentData)
),
#{?snk_kind := tdengine_connector_query_return},
10_000
Request = {send_message, SentData},
Res0 =
case QueryMode of
sync ->
query_resource(Config, Request);
async ->
{_, Ref} = query_resource_async(Config, Request),
{ok, Res} = receive_result(Ref, 2_000),
Res
end,
?assertMatch(
{ok, #{<<"code">> := 0, <<"rows">> := 1}}, Res0
),
?assertMatch(
?PAYLOAD,
@ -359,7 +401,14 @@ t_write_failure(Config) ->
{ok, _} = create_bridge(Config),
SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000},
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
?assertMatch({error, econnrefused}, send_message(Config, SentData))
{_, {ok, #{result := Result}}} =
?wait_async_action(
send_message(Config, SentData),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
?assertMatch({error, econnrefused}, Result),
ok
end),
ok.
@ -369,24 +418,50 @@ t_write_timeout(Config) ->
ProxyName = ?config(proxy_name, Config),
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
{ok, _} = create_bridge(Config),
QueryMode = ?config(query_mode, Config),
{ok, _} = create_bridge(
Config,
#{
<<"resource_opts">> => #{
<<"request_timeout">> => 500,
<<"resume_interval">> => 100,
<<"health_check_interval">> => 100
}
}
),
SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000},
emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
?assertMatch(
{error, {resource_error, #{reason := timeout}}},
query_resource(Config, {send_message, SentData})
)
end),
%% FIXME: TDengine connector hangs indefinetily during
%% `call_query' while the connection is unresponsive. Should add
%% a timeout to `APPLY_RESOURCE' in buffer worker??
case QueryMode of
sync ->
emqx_common_test_helpers:with_failure(
timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
?assertMatch(
{error, {resource_error, #{reason := timeout}}},
query_resource(Config, {send_message, SentData})
)
end
);
async ->
ct:comment("tdengine connector hangs the buffer worker forever")
end,
ok.
t_simple_sql_query(Config) ->
EnableBatch = ?config(enable_batch, Config),
?assertMatch(
{ok, _},
create_bridge(Config)
),
Request = {query, <<"SELECT count(1) AS T">>},
Result = query_resource(Config, Request),
case ?config(enable_batch, Config) of
{_, {ok, #{result := Result}}} =
?wait_async_action(
query_resource(Config, Request),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
case EnableBatch of
true ->
?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result);
false ->
@ -399,7 +474,12 @@ t_missing_data(Config) ->
{ok, _},
create_bridge(Config)
),
Result = send_message(Config, #{}),
{_, {ok, #{result := Result}}} =
?wait_async_action(
send_message(Config, #{}),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
?assertMatch(
{error, #{
<<"code">> := 534,
@ -410,13 +490,19 @@ t_missing_data(Config) ->
ok.
t_bad_sql_parameter(Config) ->
EnableBatch = ?config(enable_batch, Config),
?assertMatch(
{ok, _},
create_bridge(Config)
),
Request = {sql, <<"">>, [bad_parameter]},
Result = query_resource(Config, Request),
case ?config(enable_batch, Config) of
{_, {ok, #{result := Result}}} =
?wait_async_action(
query_resource(Config, Request),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
case EnableBatch of
true ->
?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
false ->
@ -443,9 +529,15 @@ t_nasty_sql_string(Config) ->
% [1]: https://github.com/taosdata/TDengine/blob/066cb34a/source/libs/parser/src/parUtil.c#L279-L301
Payload = list_to_binary(lists:seq(1, 127)),
Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)},
{_, {ok, #{result := Result}}} =
?wait_async_action(
send_message(Config, Message),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
?assertMatch(
{ok, #{<<"code">> := 0, <<"rows">> := 1}},
send_message(Config, Message)
Result
),
?assertEqual(
Payload,

View File

@ -60,7 +60,9 @@ on_query(InstanceId, {send_message, Message0}, State) ->
collection => emqx_plugin_libs_rule:proc_tmpl(CollectionTemplate, Message0)
},
Message = render_message(PayloadTemplate, Message0),
emqx_connector_mongo:on_query(InstanceId, {send_message, Message}, NewConnectorState);
Res = emqx_connector_mongo:on_query(InstanceId, {send_message, Message}, NewConnectorState),
?tp(mongo_ee_connector_on_query_return, #{result => Res}),
Res;
on_query(InstanceId, Request, _State = #{connector_state := ConnectorState}) ->
emqx_connector_mongo:on_query(InstanceId, Request, ConnectorState).

View File

@ -100,17 +100,6 @@ For bridges only have ingress direction data flow, it can be set to 0 otherwise
}
}
query_mode_sync_only {
desc {
en: """Query mode. Only support 'sync'."""
zh: """请求模式。目前只支持同步模式。"""
}
label {
en: """Query mode"""
zh: """请求模式"""
}
}
request_timeout {
desc {
en: """Starting from the moment when the request enters the buffer, if the request remains in the buffer for the specified time or is sent but does not receive a response or acknowledgement in time, the request is considered expired."""