761 lines
25 KiB
Erlang
761 lines
25 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_bridge_cassandra_SUITE).
|
|
|
|
-compile(nowarn_export_all).
|
|
-compile(export_all).
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include_lib("common_test/include/ct.hrl").
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
%% To run this test locally:
|
|
%% ./scripts/ct/run.sh --app apps/emqx_bridge_cassandra --only-up
|
|
%% PROFILE=emqx-enterprise PROXY_HOST=localhost CASSA_TLS_HOST=localhost \
|
|
%% CASSA_TLS_PORT=19142 CASSA_TCP_HOST=localhost CASSA_TCP_NO_AUTH_HOST=localhost \
|
|
%% CASSA_TCP_PORT=19042 CASSA_TCP_NO_AUTH_PORT=19043 \
|
|
%% ./rebar3 ct --name 'test@127.0.0.1' -v --suite \
|
|
%% apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE
|
|
|
|
-import(emqx_common_test_helpers, [on_exit/1]).
|
|
|
|
% SQL definitions
|
|
-define(SQL_BRIDGE,
|
|
"insert into mqtt_msg_test(topic, payload, arrived) "
|
|
"values (${topic}, ${payload}, ${timestamp})"
|
|
).
|
|
-define(SQL_CREATE_TABLE,
|
|
""
|
|
"\n"
|
|
"CREATE TABLE mqtt.mqtt_msg_test (\n"
|
|
" topic text,\n"
|
|
" payload text,\n"
|
|
" arrived timestamp,\n"
|
|
" PRIMARY KEY (topic)\n"
|
|
");\n"
|
|
""
|
|
).
|
|
-define(SQL_DROP_TABLE, "DROP TABLE mqtt.mqtt_msg_test").
|
|
-define(SQL_DELETE, "TRUNCATE mqtt.mqtt_msg_test").
|
|
-define(SQL_SELECT, "SELECT payload FROM mqtt.mqtt_msg_test").
|
|
|
|
% DB defaults
|
|
-define(CASSA_KEYSPACE, "mqtt").
|
|
-define(CASSA_USERNAME, "cassandra").
|
|
-define(CASSA_PASSWORD, "cassandra").
|
|
-define(BATCH_SIZE, 10).
|
|
|
|
%% cert files for client
|
|
-define(CERT_ROOT,
|
|
filename:join([emqx_common_test_helpers:proj_root(), ".ci", "docker-compose-file", "certs"])
|
|
).
|
|
|
|
-define(CAFILE, filename:join(?CERT_ROOT, ["ca.crt"])).
|
|
-define(CERTFILE, filename:join(?CERT_ROOT, ["client.pem"])).
|
|
-define(KEYFILE, filename:join(?CERT_ROOT, ["client.key"])).
|
|
|
|
%% How to run it locally:
|
|
%% 1. Start all deps services
|
|
%% sudo docker compose -f .ci/docker-compose-file/docker-compose.yaml \
|
|
%% -f .ci/docker-compose-file/docker-compose-cassandra.yaml \
|
|
%% -f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \
|
|
%% up --build
|
|
%%
|
|
%% 2. Run use cases with special environment variables
|
|
%% CASSA_TCP_HOST=127.0.0.1 CASSA_TCP_PORT=19042 \
|
|
%% CASSA_TLS_HOST=127.0.0.1 CASSA_TLS_PORT=19142 \
|
|
%% PROXY_HOST=127.0.0.1 ./rebar3 as test ct -c -v --name ct@127.0.0.1 \
|
|
%% --suite apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl
|
|
%%
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% CT boilerplate
|
|
%%------------------------------------------------------------------------------
|
|
|
|
all() ->
|
|
[
|
|
{group, tcp},
|
|
{group, tls}
|
|
].
|
|
|
|
groups() ->
|
|
TCs = emqx_common_test_helpers:all(?MODULE),
|
|
NonBatchCases = [t_write_timeout, t_simple_sql_query],
|
|
QueryModeGroups = [{group, async}, {group, sync}],
|
|
BatchingGroups = [
|
|
{group, with_batch},
|
|
{group, without_batch}
|
|
],
|
|
[
|
|
{tcp, QueryModeGroups},
|
|
{tls, QueryModeGroups},
|
|
{async, BatchingGroups},
|
|
{sync, BatchingGroups},
|
|
{with_batch, TCs -- NonBatchCases},
|
|
{without_batch, TCs}
|
|
].
|
|
|
|
init_per_group(tcp, Config) ->
|
|
Host = os:getenv("CASSA_TCP_HOST", "toxiproxy"),
|
|
Port = list_to_integer(os:getenv("CASSA_TCP_PORT", "9042")),
|
|
[
|
|
{cassa_host, Host},
|
|
{cassa_port, Port},
|
|
{enable_tls, false},
|
|
{proxy_name, "cassa_tcp"}
|
|
| Config
|
|
];
|
|
init_per_group(tls, Config) ->
|
|
Host = os:getenv("CASSA_TLS_HOST", "toxiproxy"),
|
|
Port = list_to_integer(os:getenv("CASSA_TLS_PORT", "9142")),
|
|
[
|
|
{cassa_host, Host},
|
|
{cassa_port, Port},
|
|
{enable_tls, true},
|
|
{proxy_name, "cassa_tls"}
|
|
| Config
|
|
];
|
|
init_per_group(async, Config) ->
|
|
[{query_mode, async} | Config];
|
|
init_per_group(sync, Config) ->
|
|
[{query_mode, sync} | Config];
|
|
init_per_group(with_batch, Config0) ->
|
|
Config = [{enable_batch, true} | Config0],
|
|
common_init(Config);
|
|
init_per_group(without_batch, Config0) ->
|
|
Config = [{enable_batch, false} | Config0],
|
|
common_init(Config);
|
|
init_per_group(_Group, Config) ->
|
|
Config.
|
|
|
|
end_per_group(Group, Config) when
|
|
Group == with_batch;
|
|
Group == without_batch
|
|
->
|
|
connect_and_drop_table(Config),
|
|
ProxyHost = ?config(proxy_host, Config),
|
|
ProxyPort = ?config(proxy_port, Config),
|
|
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
|
Apps = ?config(apps, Config),
|
|
emqx_cth_suite:stop(Apps),
|
|
ok;
|
|
end_per_group(_Group, _Config) ->
|
|
ok.
|
|
|
|
init_per_suite(Config) ->
|
|
Config.
|
|
|
|
end_per_suite(_Config) ->
|
|
emqx_mgmt_api_test_util:end_suite(),
|
|
ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]),
|
|
ok.
|
|
|
|
init_per_testcase(_Testcase, Config) ->
|
|
connect_and_clear_table(Config),
|
|
delete_bridge(Config),
|
|
snabbkaffe:start_trace(),
|
|
Config.
|
|
|
|
end_per_testcase(_Testcase, Config) ->
|
|
ProxyHost = ?config(proxy_host, Config),
|
|
ProxyPort = ?config(proxy_port, Config),
|
|
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
|
ok = snabbkaffe:stop(),
|
|
connect_and_clear_table(Config),
|
|
delete_bridge(Config),
|
|
emqx_common_test_helpers:call_janitor(),
|
|
ok.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Helper fns
|
|
%%------------------------------------------------------------------------------
|
|
|
|
common_init(Config0) ->
|
|
ct:pal("commit_init: ~p~n", [Config0]),
|
|
BridgeType = proplists:get_value(bridge_type, Config0, <<"cassandra">>),
|
|
Host = ?config(cassa_host, Config0),
|
|
Port = ?config(cassa_port, Config0),
|
|
case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
|
|
true ->
|
|
% Setup toxiproxy
|
|
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
|
|
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
|
|
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
|
Apps = emqx_cth_suite:start(
|
|
[
|
|
emqx,
|
|
emqx_conf,
|
|
emqx_bridge_cassandra,
|
|
emqx_bridge,
|
|
emqx_rule_engine,
|
|
emqx_management,
|
|
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
|
|
],
|
|
#{work_dir => emqx_cth_suite:work_dir(Config0)}
|
|
),
|
|
{ok, _Api} = emqx_common_test_http:create_default_app(),
|
|
% Connect to cassnadra directly and create the table
|
|
catch connect_and_drop_table(Config0),
|
|
connect_and_create_table(Config0),
|
|
{Name, CassaConf} = cassa_config(BridgeType, Config0),
|
|
Config =
|
|
[
|
|
{apps, Apps},
|
|
{cassa_config, CassaConf},
|
|
{cassa_bridge_type, BridgeType},
|
|
{cassa_name, Name},
|
|
{bridge_type, BridgeType},
|
|
{bridge_name, Name},
|
|
{bridge_config, CassaConf},
|
|
{proxy_host, ProxyHost},
|
|
{proxy_port, ProxyPort}
|
|
| Config0
|
|
],
|
|
Config;
|
|
false ->
|
|
case os:getenv("IS_CI") of
|
|
"yes" ->
|
|
throw(no_cassandra);
|
|
_ ->
|
|
{skip, no_cassandra}
|
|
end
|
|
end.
|
|
|
|
cassa_config(BridgeType, Config) ->
|
|
Port = integer_to_list(?config(cassa_port, Config)),
|
|
Server = ?config(cassa_host, Config) ++ ":" ++ Port,
|
|
Name = atom_to_binary(?MODULE),
|
|
BatchSize =
|
|
case ?config(enable_batch, Config) of
|
|
true -> ?BATCH_SIZE;
|
|
false -> 1
|
|
end,
|
|
QueryMode = ?config(query_mode, Config),
|
|
TlsEnabled = ?config(enable_tls, Config),
|
|
ConfigString =
|
|
io_lib:format(
|
|
"bridges.~s.~s {\n"
|
|
" enable = true\n"
|
|
" servers = ~p\n"
|
|
" keyspace = ~p\n"
|
|
" username = ~p\n"
|
|
" password = ~p\n"
|
|
" cql = ~p\n"
|
|
" resource_opts = {\n"
|
|
" request_ttl = 500ms\n"
|
|
" batch_size = ~b\n"
|
|
" query_mode = ~s\n"
|
|
" }\n"
|
|
" ssl = {\n"
|
|
" enable = ~w\n"
|
|
" cacertfile = \"~s\"\n"
|
|
" certfile = \"~s\"\n"
|
|
" keyfile = \"~s\"\n"
|
|
" server_name_indication = disable\n"
|
|
" }\n"
|
|
"}",
|
|
[
|
|
BridgeType,
|
|
Name,
|
|
Server,
|
|
?CASSA_KEYSPACE,
|
|
?CASSA_USERNAME,
|
|
?CASSA_PASSWORD,
|
|
?SQL_BRIDGE,
|
|
BatchSize,
|
|
QueryMode,
|
|
TlsEnabled,
|
|
?CAFILE,
|
|
?CERTFILE,
|
|
?KEYFILE
|
|
]
|
|
),
|
|
{Name, parse_and_check(ConfigString, BridgeType, Name)}.
|
|
|
|
parse_and_check(ConfigString, BridgeType, Name) ->
|
|
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
|
|
hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
|
|
#{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf,
|
|
Config.
|
|
|
|
create_bridge(Config) ->
|
|
create_bridge(Config, _Overrides = #{}).
|
|
|
|
create_bridge(Config, Overrides) ->
|
|
BridgeType = ?config(cassa_bridge_type, Config),
|
|
Name = ?config(cassa_name, Config),
|
|
BridgeConfig0 = ?config(cassa_config, Config),
|
|
BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
|
|
emqx_bridge:create(BridgeType, Name, BridgeConfig).
|
|
|
|
delete_bridge(Config) ->
|
|
BridgeType = ?config(cassa_bridge_type, Config),
|
|
Name = ?config(cassa_name, Config),
|
|
emqx_bridge:remove(BridgeType, Name).
|
|
|
|
create_bridge_http(Params) ->
|
|
Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
|
|
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
|
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
|
|
{ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
|
|
Error -> Error
|
|
end.
|
|
|
|
bridges_probe_http(Params) ->
|
|
Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
|
|
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
|
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
|
|
{ok, _} -> ok;
|
|
Error -> Error
|
|
end.
|
|
|
|
send_message(Config, Payload) ->
|
|
Name = ?config(cassa_name, Config),
|
|
BridgeType = ?config(cassa_bridge_type, Config),
|
|
BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
|
|
emqx_bridge:send_message(BridgeID, Payload).
|
|
|
|
query_resource(Config, Request) ->
|
|
Name = ?config(cassa_name, Config),
|
|
BridgeType = ?config(cassa_bridge_type, Config),
|
|
BridgeV2Id = emqx_bridge_v2:id(BridgeType, Name),
|
|
ConnectorResId = emqx_connector_resource:resource_id(BridgeType, Name),
|
|
emqx_resource:query(BridgeV2Id, Request, #{
|
|
timeout => 1_000, connector_resource_id => ConnectorResId
|
|
}).
|
|
|
|
query_resource_async(Config, Request) ->
|
|
Name = ?config(cassa_name, Config),
|
|
BridgeType = ?config(cassa_bridge_type, Config),
|
|
Ref = alias([reply]),
|
|
AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
|
|
BridgeV2Id = emqx_bridge_v2:id(BridgeType, Name),
|
|
ConnectorResId = emqx_connector_resource:resource_id(BridgeType, Name),
|
|
Return = emqx_resource:query(BridgeV2Id, Request, #{
|
|
timeout => 500,
|
|
async_reply_fun => {AsyncReplyFun, []},
|
|
connector_resource_id => ConnectorResId,
|
|
query_mode => async
|
|
}),
|
|
{Return, Ref}.
|
|
|
|
receive_result(Ref, Timeout) when is_reference(Ref) ->
|
|
receive
|
|
{result, Ref, Result} ->
|
|
{ok, Result};
|
|
{Ref, Result} ->
|
|
{ok, Result}
|
|
after Timeout ->
|
|
timeout
|
|
end.
|
|
|
|
connect_direct_cassa(Config) ->
|
|
Opts = #{
|
|
nodes => [{?config(cassa_host, Config), ?config(cassa_port, Config)}],
|
|
username => ?CASSA_USERNAME,
|
|
password => ?CASSA_PASSWORD,
|
|
keyspace => ?CASSA_KEYSPACE
|
|
},
|
|
SslOpts =
|
|
case ?config(enable_tls, Config) of
|
|
true ->
|
|
Opts#{
|
|
ssl => emqx_tls_lib:to_client_opts(
|
|
#{
|
|
enable => true,
|
|
cacertfile => ?CAFILE,
|
|
certfile => ?CERTFILE,
|
|
keyfile => ?KEYFILE
|
|
}
|
|
)
|
|
};
|
|
false ->
|
|
Opts
|
|
end,
|
|
{ok, Con} = ecql:connect(maps:to_list(SslOpts)),
|
|
Con.
|
|
|
|
% These funs connect and then stop the cassandra connection
|
|
connect_and_create_table(Config) ->
|
|
connect_and_create_table(Config, ?SQL_CREATE_TABLE).
|
|
|
|
connect_and_create_table(Config, SQL) ->
|
|
with_direct_conn(Config, fun(Conn) ->
|
|
{ok, _} = ecql:query(Conn, SQL)
|
|
end).
|
|
|
|
connect_and_drop_table(Config) ->
|
|
connect_and_drop_table(Config, ?SQL_DROP_TABLE).
|
|
|
|
connect_and_drop_table(Config, SQL) ->
|
|
with_direct_conn(Config, fun(Conn) ->
|
|
{ok, _} = ecql:query(Conn, SQL)
|
|
end).
|
|
|
|
connect_and_clear_table(Config) ->
|
|
with_direct_conn(Config, fun(Conn) ->
|
|
ok = ecql:query(Conn, ?SQL_DELETE)
|
|
end).
|
|
|
|
connect_and_get_payload(Config) ->
|
|
connect_and_get_payload(Config, ?SQL_SELECT).
|
|
|
|
connect_and_get_payload(Config, SQL) ->
|
|
with_direct_conn(Config, fun(Conn) ->
|
|
{ok, {_Keyspace, _ColsSpec, [[Result]]}} = ecql:query(Conn, SQL),
|
|
Result
|
|
end).
|
|
|
|
with_direct_conn(Config, Fn) ->
|
|
Conn = connect_direct_cassa(Config),
|
|
try
|
|
Fn(Conn)
|
|
after
|
|
ok = ecql:close(Conn)
|
|
end.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Testcases
|
|
%%------------------------------------------------------------------------------
|
|
|
|
t_setup_via_config_and_publish(Config) ->
|
|
?assertMatch(
|
|
{ok, _},
|
|
create_bridge(Config)
|
|
),
|
|
Val = integer_to_binary(erlang:unique_integer()),
|
|
SentData = #{
|
|
topic => atom_to_binary(?FUNCTION_NAME),
|
|
payload => Val,
|
|
timestamp => 1668602148000
|
|
},
|
|
?check_trace(
|
|
begin
|
|
?wait_async_action(
|
|
?assertEqual(ok, send_message(Config, SentData)),
|
|
#{?snk_kind := cassandra_connector_query_return},
|
|
10_000
|
|
),
|
|
?assertMatch(
|
|
Val,
|
|
connect_and_get_payload(Config)
|
|
),
|
|
ok
|
|
end,
|
|
fun(Trace0) ->
|
|
Trace = ?of_kind(cassandra_connector_query_return, Trace0),
|
|
?assertMatch([#{result := {ok, _Pid}}], Trace),
|
|
ok
|
|
end
|
|
),
|
|
ok.
|
|
|
|
t_setup_via_http_api_and_publish(Config) ->
|
|
BridgeType = ?config(cassa_bridge_type, Config),
|
|
Name = ?config(cassa_name, Config),
|
|
BridgeConfig0 = ?config(cassa_config, Config),
|
|
BridgeConfig = BridgeConfig0#{
|
|
<<"name">> => Name,
|
|
<<"type">> => BridgeType
|
|
},
|
|
?assertMatch(
|
|
{ok, _},
|
|
create_bridge_http(BridgeConfig)
|
|
),
|
|
Val = integer_to_binary(erlang:unique_integer()),
|
|
SentData = #{
|
|
topic => atom_to_binary(?FUNCTION_NAME),
|
|
payload => Val,
|
|
timestamp => 1668602148000
|
|
},
|
|
?check_trace(
|
|
begin
|
|
?wait_async_action(
|
|
?assertEqual(ok, send_message(Config, SentData)),
|
|
#{?snk_kind := cassandra_connector_query_return},
|
|
10_000
|
|
),
|
|
?assertMatch(
|
|
Val,
|
|
connect_and_get_payload(Config)
|
|
),
|
|
ok
|
|
end,
|
|
fun(Trace0) ->
|
|
Trace = ?of_kind(cassandra_connector_query_return, Trace0),
|
|
?assertMatch([#{result := {ok, _Pid}}], Trace),
|
|
ok
|
|
end
|
|
),
|
|
ok.
|
|
|
|
t_get_status(Config) ->
|
|
?assertMatch(
|
|
{ok, _},
|
|
create_bridge(Config)
|
|
),
|
|
ProxyPort = ?config(proxy_port, Config),
|
|
ProxyHost = ?config(proxy_host, Config),
|
|
ProxyName = ?config(proxy_name, Config),
|
|
|
|
Name = ?config(cassa_name, Config),
|
|
BridgeType = ?config(cassa_bridge_type, Config),
|
|
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
|
|
|
|
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)),
|
|
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
|
|
?assertMatch(
|
|
{ok, Status} when Status =:= disconnected orelse Status =:= connecting,
|
|
emqx_resource_manager:health_check(ResourceID)
|
|
)
|
|
end),
|
|
ok.
|
|
|
|
t_bridges_probe_via_http(Config) ->
|
|
BridgeType = ?config(cassa_bridge_type, Config),
|
|
Name = ?config(cassa_name, Config),
|
|
BridgeConfig0 = ?config(cassa_config, Config),
|
|
BridgeConfig = BridgeConfig0#{
|
|
<<"name">> => Name,
|
|
<<"type">> => BridgeType
|
|
},
|
|
?assertMatch(ok, bridges_probe_http(BridgeConfig)),
|
|
|
|
ok.
|
|
|
|
t_create_disconnected(Config) ->
|
|
ProxyPort = ?config(proxy_port, Config),
|
|
ProxyHost = ?config(proxy_host, Config),
|
|
ProxyName = ?config(proxy_name, Config),
|
|
?check_trace(
|
|
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
|
|
?assertMatch({ok, _}, create_bridge(Config))
|
|
end),
|
|
fun(Trace) ->
|
|
?assertMatch(
|
|
[#{error := {start_pool_failed, _, _}}],
|
|
?of_kind(cassandra_connector_start_failed, Trace)
|
|
),
|
|
ok
|
|
end
|
|
),
|
|
ok.
|
|
|
|
t_write_failure(Config) ->
|
|
ProxyName = ?config(proxy_name, Config),
|
|
ProxyPort = ?config(proxy_port, Config),
|
|
ProxyHost = ?config(proxy_host, Config),
|
|
QueryMode = ?config(query_mode, Config),
|
|
{ok, _} = create_bridge(
|
|
Config,
|
|
#{
|
|
<<"resource_opts">> =>
|
|
#{
|
|
<<"resume_interval">> => <<"100ms">>,
|
|
<<"health_check_interval">> => <<"100ms">>
|
|
}
|
|
}
|
|
),
|
|
Val = integer_to_binary(erlang:unique_integer()),
|
|
SentData = #{
|
|
topic => atom_to_binary(?FUNCTION_NAME),
|
|
payload => Val,
|
|
timestamp => 1668602148000
|
|
},
|
|
?check_trace(
|
|
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
|
|
{_, {ok, _}} =
|
|
?wait_async_action(
|
|
case QueryMode of
|
|
sync ->
|
|
?assertMatch({error, _}, send_message(Config, SentData));
|
|
async ->
|
|
send_message(Config, SentData)
|
|
end,
|
|
#{?snk_kind := Evt} when
|
|
Evt =:= buffer_worker_flush_nack orelse
|
|
Evt =:= buffer_worker_retry_inflight_failed,
|
|
10_000
|
|
)
|
|
end),
|
|
fun(Trace0) ->
|
|
ct:pal("trace: ~p", [Trace0]),
|
|
Trace = ?of_kind(
|
|
[buffer_worker_flush_nack, buffer_worker_retry_inflight_failed], Trace0
|
|
),
|
|
[#{result := Result} | _] = Trace,
|
|
case Result of
|
|
{async_return, {error, {resource_error, _}}} ->
|
|
ok;
|
|
{async_return, {error, {recoverable_error, disconnected}}} ->
|
|
ok;
|
|
{error, {resource_error, _}} ->
|
|
ok;
|
|
_ ->
|
|
ct:fail("unexpected error: ~p", [Result])
|
|
end
|
|
end
|
|
),
|
|
ok.
|
|
|
|
%% This test doesn't work with batch enabled since it is not possible
|
|
%% to set the timeout directly for batch queries
|
|
%%
|
|
%% XXX: parameter with request timeout is not supported yet.
|
|
%%
|
|
%t_write_timeout(Config) ->
|
|
% ProxyName = ?config(proxy_name, Config),
|
|
% ProxyPort = ?config(proxy_port, Config),
|
|
% ProxyHost = ?config(proxy_host, Config),
|
|
% {ok, _} = create_bridge(Config),
|
|
% Val = integer_to_binary(erlang:unique_integer()),
|
|
% SentData = #{payload => Val, timestamp => 1668602148000},
|
|
% Timeout = 1000,
|
|
% emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
|
|
% ?assertMatch(
|
|
% {error, {resource_error, #{reason := timeout}}},
|
|
% query_resource(Config, {send_message, SentData, [], Timeout})
|
|
% )
|
|
% end),
|
|
% ok.
|
|
|
|
t_simple_sql_query(Config) ->
|
|
QueryMode = ?config(query_mode, Config),
|
|
?assertMatch(
|
|
{ok, _},
|
|
create_bridge(Config)
|
|
),
|
|
Request = {query, <<"SELECT count(1) AS T FROM system.local">>},
|
|
Result =
|
|
case QueryMode of
|
|
sync ->
|
|
query_resource(Config, Request);
|
|
async ->
|
|
{_, Ref} = query_resource_async(Config, Request),
|
|
{ok, Res} = receive_result(Ref, 2_000),
|
|
Res
|
|
end,
|
|
?assertMatch({ok, {<<"system.local">>, _, [[1]]}}, Result),
|
|
ok.
|
|
|
|
t_missing_data(Config) ->
|
|
?assertMatch(
|
|
{ok, _},
|
|
create_bridge(Config)
|
|
),
|
|
%% emqx_bridge_cassandra_connector will send missed data as a `null` atom
|
|
%% to ecql driver
|
|
?check_trace(
|
|
begin
|
|
{_, {ok, _}} =
|
|
?wait_async_action(
|
|
send_message(Config, #{}),
|
|
#{?snk_kind := handle_async_reply, result := {error, {8704, _}}},
|
|
30_000
|
|
),
|
|
ok
|
|
end,
|
|
fun(Trace0) ->
|
|
%% 1. ecql driver will return `ok` first in async query
|
|
Trace = ?of_kind(cassandra_connector_query_return, Trace0),
|
|
?assertMatch([#{result := {ok, _Pid}}], Trace),
|
|
%% 2. then it will return an error in callback function
|
|
Trace1 = ?of_kind(handle_async_reply, Trace0),
|
|
?assertMatch([#{result := {error, {8704, _}}}], Trace1),
|
|
ok
|
|
end
|
|
),
|
|
ok.
|
|
|
|
t_bad_sql_parameter(Config) ->
|
|
QueryMode = ?config(query_mode, Config),
|
|
?assertMatch(
|
|
{ok, _},
|
|
create_bridge(
|
|
Config,
|
|
#{
|
|
<<"resource_opts">> => #{
|
|
<<"request_ttl">> => <<"500ms">>,
|
|
<<"resume_interval">> => <<"100ms">>,
|
|
<<"health_check_interval">> => <<"100ms">>
|
|
}
|
|
}
|
|
)
|
|
),
|
|
Request = {query, <<"">>, [bad_parameter]},
|
|
Result =
|
|
case QueryMode of
|
|
sync ->
|
|
query_resource(Config, Request);
|
|
async ->
|
|
{_, Ref} = query_resource_async(Config, Request),
|
|
case receive_result(Ref, 5_000) of
|
|
{ok, Res} ->
|
|
Res;
|
|
timeout ->
|
|
ct:pal("mailbox:\n ~p", [process_info(self(), messages)]),
|
|
ct:fail("no response received")
|
|
end
|
|
end,
|
|
?assertMatch({error, _}, Result),
|
|
ok.
|
|
|
|
t_nasty_sql_string(Config) ->
|
|
?assertMatch({ok, _}, create_bridge(Config)),
|
|
Payload = list_to_binary(lists:seq(1, 127)),
|
|
Message = #{
|
|
topic => atom_to_binary(?FUNCTION_NAME),
|
|
payload => Payload,
|
|
timestamp => erlang:system_time(millisecond)
|
|
},
|
|
%% XXX: why ok instead of {ok, AffectedLines}?
|
|
?assertEqual(ok, send_message(Config, Message)),
|
|
?assertEqual(Payload, connect_and_get_payload(Config)).
|
|
|
|
t_insert_null_into_int_column(Config) ->
|
|
BridgeType = ?config(bridge_type, Config),
|
|
connect_and_create_table(
|
|
Config,
|
|
<<
|
|
"CREATE TABLE mqtt.mqtt_msg_test2 (\n"
|
|
" topic text,\n"
|
|
" payload text,\n"
|
|
" arrived timestamp,\n"
|
|
" x int,\n"
|
|
" PRIMARY KEY (topic)\n"
|
|
")"
|
|
>>
|
|
),
|
|
on_exit(fun() -> connect_and_drop_table(Config, "DROP TABLE mqtt.mqtt_msg_test2") end),
|
|
{ok, {{_, 201, _}, _, _}} =
|
|
emqx_bridge_testlib:create_bridge_api(
|
|
Config,
|
|
#{
|
|
<<"cql">> => <<
|
|
"insert into mqtt_msg_test2(topic, payload, x, arrived) "
|
|
"values (${topic}, ${payload}, ${x}, ${timestamp})"
|
|
>>
|
|
}
|
|
),
|
|
RuleTopic = <<"t/c">>,
|
|
Opts = #{
|
|
sql => <<"select *, first(jq('null', payload)) as x from \"", RuleTopic/binary, "\"">>
|
|
},
|
|
{ok, _} = emqx_bridge_testlib:create_rule_and_action_http(BridgeType, RuleTopic, Config, Opts),
|
|
|
|
Payload = <<"{}">>,
|
|
Msg = emqx_message:make(RuleTopic, Payload),
|
|
{_, {ok, _}} =
|
|
?wait_async_action(
|
|
emqx:publish(Msg),
|
|
#{?snk_kind := cassandra_connector_query_return},
|
|
10_000
|
|
),
|
|
|
|
%% Would return `1853189228' if it encodes `null' as an integer...
|
|
?assertEqual(null, connect_and_get_payload(Config, "select x from mqtt.mqtt_msg_test2")),
|
|
|
|
ok.
|