test(mysql): slightly simplify bridge testsuite

This commit is contained in:
Andrew Mayorov 2023-11-07 15:56:18 +07:00
parent 36e57a479d
commit f827df2821
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 42 additions and 62 deletions

View File

@ -21,7 +21,6 @@
"DEFAULT CHARSET=utf8MB4;" "DEFAULT CHARSET=utf8MB4;"
). ).
-define(SQL_DROP_TABLE, "DROP TABLE mqtt_test"). -define(SQL_DROP_TABLE, "DROP TABLE mqtt_test").
-define(SQL_DELETE, "DELETE from mqtt_test").
-define(SQL_SELECT, "SELECT payload FROM mqtt_test"). -define(SQL_SELECT, "SELECT payload FROM mqtt_test").
% DB defaults % DB defaults
@ -112,8 +111,8 @@ end_per_suite(_Config) ->
ok. ok.
init_per_testcase(_Testcase, Config) -> init_per_testcase(_Testcase, Config) ->
connect_and_drop_table(Config),
connect_and_create_table(Config), connect_and_create_table(Config),
connect_and_clear_table(Config),
delete_bridge(Config), delete_bridge(Config),
snabbkaffe:start_trace(), snabbkaffe:start_trace(),
Config. Config.
@ -122,9 +121,7 @@ end_per_testcase(_Testcase, Config) ->
ProxyHost = ?config(proxy_host, Config), ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config), ProxyPort = ?config(proxy_port, Config),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
connect_and_clear_table(Config),
ok = snabbkaffe:stop(), ok = snabbkaffe:stop(),
delete_bridge(Config),
emqx_common_test_helpers:call_janitor(), emqx_common_test_helpers:call_janitor(),
ok. ok.
@ -323,9 +320,6 @@ connect_and_create_table(Config) ->
connect_and_drop_table(Config) -> connect_and_drop_table(Config) ->
query_direct_mysql(Config, ?SQL_DROP_TABLE). query_direct_mysql(Config, ?SQL_DROP_TABLE).
connect_and_clear_table(Config) ->
query_direct_mysql(Config, ?SQL_DELETE).
connect_and_get_payload(Config) -> connect_and_get_payload(Config) ->
query_direct_mysql(Config, ?SQL_SELECT). query_direct_mysql(Config, ?SQL_SELECT).
@ -777,28 +771,21 @@ t_table_removed(Config) ->
Name = ?config(mysql_name, Config), Name = ?config(mysql_name, Config),
BridgeType = ?config(mysql_bridge_type, Config), BridgeType = ?config(mysql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
?check_trace( connect_and_create_table(Config),
begin ?assertMatch({ok, _}, create_bridge(Config)),
connect_and_create_table(Config), ?retry(
?assertMatch({ok, _}, create_bridge(Config)), _Sleep = 1_000,
?retry( _Attempts = 20,
_Sleep = 1_000, ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
_Attempts = 20, ),
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)) connect_and_drop_table(Config),
), Val = integer_to_binary(erlang:unique_integer()),
connect_and_drop_table(Config), SentData = #{payload => Val, timestamp => 1668602148000},
Val = integer_to_binary(erlang:unique_integer()), Timeout = 1000,
SentData = #{payload => Val, timestamp => 1668602148000}, ?assertMatch(
Timeout = 1000, {error,
?assertMatch( {unrecoverable_error, {1146, <<"42S02">>, <<"Table 'mqtt.mqtt_test' doesn't exist">>}}},
{error, sync_query_resource(Config, {send_message, SentData, [], Timeout})
{unrecoverable_error,
{1146, <<"42S02">>, <<"Table 'mqtt.mqtt_test' doesn't exist">>}}},
sync_query_resource(Config, {send_message, SentData, [], Timeout})
),
ok
end,
[]
), ),
ok. ok.
@ -807,38 +794,31 @@ t_nested_payload_template(Config) ->
BridgeType = ?config(mysql_bridge_type, Config), BridgeType = ?config(mysql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
Value = integer_to_binary(erlang:unique_integer()), Value = integer_to_binary(erlang:unique_integer()),
?check_trace( {ok, _} = create_bridge(
begin Config,
connect_and_create_table(Config), #{
{ok, _} = create_bridge( <<"sql">> =>
Config, "INSERT INTO mqtt_test(payload, arrived) "
#{ "VALUES (${payload.value}, FROM_UNIXTIME(${timestamp}/1000))"
<<"sql">> => }
"INSERT INTO mqtt_test(payload, arrived) " ),
"VALUES (${payload.value}, FROM_UNIXTIME(${timestamp}/1000))" {ok, #{<<"from">> := [Topic]}} = create_rule_and_action_http(Config),
} ?retry(
), _Sleep = 1_000,
{ok, #{<<"from">> := [Topic]}} = create_rule_and_action_http(Config), _Attempts = 20,
?retry( ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
_Sleep = 1_000, ),
_Attempts = 20, %% send message via rule action
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)) Payload = emqx_utils_json:encode(#{value => Value}),
), Message = emqx_message:make(Topic, Payload),
%% send message via rule action {_, {ok, _}} =
Payload = emqx_utils_json:encode(#{value => Value}), ?wait_async_action(
Message = emqx_message:make(Topic, Payload), emqx:publish(Message),
{_, {ok, _}} = #{?snk_kind := mysql_connector_query_return},
?wait_async_action( 10_000
emqx:publish(Message), ),
#{?snk_kind := mysql_connector_query_return}, ?assertEqual(
10_000 {ok, [<<"payload">>], [[Value]]},
), connect_and_get_payload(Config)
?assertEqual(
{ok, [<<"payload">>], [[Value]]},
connect_and_get_payload(Config)
),
ok
end,
[]
), ),
ok. ok.