diff --git a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl index 79e8ae36e..06b0256e2 100644 --- a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl +++ b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl @@ -165,19 +165,32 @@ sql_create_table() -> "CREATE TABLE mqtt_test (topic VARCHAR2(255), msgid VARCHAR2(64), payload NCLOB, retain NUMBER(1))". sql_drop_table() -> - "DROP TABLE mqtt_test". + "BEGIN\n" + " EXECUTE IMMEDIATE 'DROP TABLE mqtt_test';\n" + " EXCEPTION\n" + " WHEN OTHERS THEN\n" + " IF SQLCODE = -942 THEN\n" + " NULL;\n" + " ELSE\n" + " RAISE;\n" + " END IF;\n" + " END;". + +sql_check_table_exist() -> + "SELECT COUNT(*) FROM user_tables WHERE table_name = 'MQTT_TEST'". reset_table(Config) -> ResourceId = resource_id(Config), - _ = emqx_resource:simple_sync_query(ResourceId, {sql, sql_drop_table()}), + drop_table_if_exists(Config), {ok, [{proc_result, 0, _}]} = emqx_resource:simple_sync_query( ResourceId, {sql, sql_create_table()} ), ok. -drop_table(Config) -> +drop_table_if_exists(Config) -> ResourceId = resource_id(Config), - emqx_resource:simple_sync_query(ResourceId, {query, sql_drop_table()}), + {ok, [{proc_result, 0, _}]} = + emqx_resource:simple_sync_query(ResourceId, {query, sql_drop_table()}), ok. oracle_config(TestCase, _ConnectionType, Config) -> @@ -392,6 +405,12 @@ t_batch_sync_query(Config) -> emqx_bridge:send_message(BridgeId, Params), ok end), + % Wait for reconnection. + ?retry( + _Sleep = 1_000, + _Attempts = 30, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), ?retry( _Sleep = 1_000, _Attempts = 30, @@ -527,3 +546,32 @@ t_no_sid_nor_service_name(Config0) -> create_bridge(Config) ), ok. + +t_table_removed(Config) -> + ResourceId = resource_id(Config), + ?check_trace( + begin + ?assertMatch({ok, _}, create_bridge_api(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + drop_table_if_exists(Config), + MsgId = erlang:unique_integer(), + Params = #{ + topic => ?config(mqtt_topic, Config), + id => MsgId, + payload => ?config(oracle_name, Config), + retain => true + }, + Message = {send_message, Params}, + ?assertEqual( + {error, {unrecoverable_error, {942, "ORA-00942: table or view does not exist\n"}}}, + emqx_resource:simple_sync_query(ResourceId, Message) + ), + ok + end, + [] + ), + ok. diff --git a/apps/emqx_oracle/src/emqx_oracle.erl b/apps/emqx_oracle/src/emqx_oracle.erl index ae2128a7e..0f543badd 100644 --- a/apps/emqx_oracle/src/emqx_oracle.erl +++ b/apps/emqx_oracle/src/emqx_oracle.erl @@ -351,6 +351,10 @@ to_bin(Bin) when is_binary(Bin) -> to_bin(Atom) when is_atom(Atom) -> erlang:atom_to_binary(Atom). +handle_result({error, {recoverable_error, _Error}} = Res) -> + Res; +handle_result({error, {unrecoverable_error, _Error}} = Res) -> + Res; handle_result({error, disconnected}) -> {error, {recoverable_error, disconnected}}; handle_result({error, Error}) -> @@ -359,6 +363,8 @@ handle_result({error, socket, closed} = Error) -> {error, {recoverable_error, Error}}; handle_result({error, Type, Reason}) -> {error, {unrecoverable_error, {Type, Reason}}}; +handle_result({ok, [{proc_result, RetCode, Reason}]}) when RetCode =/= 0 -> + {error, {unrecoverable_error, {RetCode, Reason}}}; handle_result(Res) -> Res.