diff --git a/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl b/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl index a34b65ede..98b957b19 100644 --- a/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl +++ b/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl @@ -21,7 +21,6 @@ "DEFAULT CHARSET=utf8MB4;" ). -define(SQL_DROP_TABLE, "DROP TABLE mqtt_test"). --define(SQL_DELETE, "DELETE from mqtt_test"). -define(SQL_SELECT, "SELECT payload FROM mqtt_test"). % DB defaults @@ -112,8 +111,8 @@ end_per_suite(_Config) -> ok. init_per_testcase(_Testcase, Config) -> + connect_and_drop_table(Config), connect_and_create_table(Config), - connect_and_clear_table(Config), delete_bridge(Config), snabbkaffe:start_trace(), Config. @@ -122,9 +121,7 @@ end_per_testcase(_Testcase, Config) -> ProxyHost = ?config(proxy_host, Config), ProxyPort = ?config(proxy_port, Config), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - connect_and_clear_table(Config), ok = snabbkaffe:stop(), - delete_bridge(Config), emqx_common_test_helpers:call_janitor(), ok. @@ -323,9 +320,6 @@ connect_and_create_table(Config) -> connect_and_drop_table(Config) -> query_direct_mysql(Config, ?SQL_DROP_TABLE). -connect_and_clear_table(Config) -> - query_direct_mysql(Config, ?SQL_DELETE). - connect_and_get_payload(Config) -> query_direct_mysql(Config, ?SQL_SELECT). @@ -777,28 +771,21 @@ t_table_removed(Config) -> Name = ?config(mysql_name, Config), BridgeType = ?config(mysql_bridge_type, Config), ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), - ?check_trace( - begin - connect_and_create_table(Config), - ?assertMatch({ok, _}, create_bridge(Config)), - ?retry( - _Sleep = 1_000, - _Attempts = 20, - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)) - ), - connect_and_drop_table(Config), - Val = integer_to_binary(erlang:unique_integer()), - SentData = #{payload => Val, timestamp => 1668602148000}, - Timeout = 1000, - ?assertMatch( - {error, - {unrecoverable_error, - {1146, <<"42S02">>, <<"Table 'mqtt.mqtt_test' doesn't exist">>}}}, - sync_query_resource(Config, {send_message, SentData, [], Timeout}) - ), - ok - end, - [] + connect_and_create_table(Config), + ?assertMatch({ok, _}, create_bridge(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)) + ), + connect_and_drop_table(Config), + Val = integer_to_binary(erlang:unique_integer()), + SentData = #{payload => Val, timestamp => 1668602148000}, + Timeout = 1000, + ?assertMatch( + {error, + {unrecoverable_error, {1146, <<"42S02">>, <<"Table 'mqtt.mqtt_test' doesn't exist">>}}}, + sync_query_resource(Config, {send_message, SentData, [], Timeout}) ), ok. @@ -807,38 +794,31 @@ t_nested_payload_template(Config) -> BridgeType = ?config(mysql_bridge_type, Config), ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), Value = integer_to_binary(erlang:unique_integer()), - ?check_trace( - begin - connect_and_create_table(Config), - {ok, _} = create_bridge( - Config, - #{ - <<"sql">> => - "INSERT INTO mqtt_test(payload, arrived) " - "VALUES (${payload.value}, FROM_UNIXTIME(${timestamp}/1000))" - } - ), - {ok, #{<<"from">> := [Topic]}} = create_rule_and_action_http(Config), - ?retry( - _Sleep = 1_000, - _Attempts = 20, - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)) - ), - %% send message via rule action - Payload = emqx_utils_json:encode(#{value => Value}), - Message = emqx_message:make(Topic, Payload), - {_, {ok, _}} = - ?wait_async_action( - emqx:publish(Message), - #{?snk_kind := mysql_connector_query_return}, - 10_000 - ), - ?assertEqual( - {ok, [<<"payload">>], [[Value]]}, - connect_and_get_payload(Config) - ), - ok - end, - [] + {ok, _} = create_bridge( + Config, + #{ + <<"sql">> => + "INSERT INTO mqtt_test(payload, arrived) " + "VALUES (${payload.value}, FROM_UNIXTIME(${timestamp}/1000))" + } + ), + {ok, #{<<"from">> := [Topic]}} = create_rule_and_action_http(Config), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)) + ), + %% send message via rule action + Payload = emqx_utils_json:encode(#{value => Value}), + Message = emqx_message:make(Topic, Payload), + {_, {ok, _}} = + ?wait_async_action( + emqx:publish(Message), + #{?snk_kind := mysql_connector_query_return}, + 10_000 + ), + ?assertEqual( + {ok, [<<"payload">>], [[Value]]}, + connect_and_get_payload(Config) ), ok.