From fb455d68a3ac4cbf5c33d5d1168599f8855e6bf1 Mon Sep 17 00:00:00 2001 From: Paulo Zulato Date: Tue, 30 May 2023 21:44:48 -0300 Subject: [PATCH] fix(oracle): fix error handling on sync query Fixes https://emqx.atlassian.net/browse/EMQX-10075 --- .../test/emqx_bridge_oracle_SUITE.erl | 56 +++++++++++++++++-- apps/emqx_oracle/src/emqx_oracle.app.src | 2 +- apps/emqx_oracle/src/emqx_oracle.erl | 6 ++ 3 files changed, 59 insertions(+), 5 deletions(-) diff --git a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl index 2e72458b6..483a8f484 100644 --- a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl +++ b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl @@ -165,19 +165,32 @@ sql_create_table() -> "CREATE TABLE mqtt_test (topic VARCHAR2(255), msgid VARCHAR2(64), payload NCLOB, retain NUMBER(1))". sql_drop_table() -> - "DROP TABLE mqtt_test". + "BEGIN\n" + " EXECUTE IMMEDIATE 'DROP TABLE mqtt_test';\n" + " EXCEPTION\n" + " WHEN OTHERS THEN\n" + " IF SQLCODE = -942 THEN\n" + " NULL;\n" + " ELSE\n" + " RAISE;\n" + " END IF;\n" + " END;". + +sql_check_table_exist() -> + "SELECT COUNT(*) FROM user_tables WHERE table_name = 'MQTT_TEST'". reset_table(Config) -> ResourceId = resource_id(Config), - _ = emqx_resource:simple_sync_query(ResourceId, {sql, sql_drop_table()}), + drop_table_if_exists(Config), {ok, [{proc_result, 0, _}]} = emqx_resource:simple_sync_query( ResourceId, {sql, sql_create_table()} ), ok. -drop_table(Config) -> +drop_table_if_exists(Config) -> ResourceId = resource_id(Config), - emqx_resource:simple_sync_query(ResourceId, {query, sql_drop_table()}), + {ok, [{proc_result, 0, _}]} = + emqx_resource:simple_sync_query(ResourceId, {query, sql_drop_table()}), ok. oracle_config(TestCase, _ConnectionType, Config) -> @@ -394,6 +407,12 @@ t_batch_sync_query(Config) -> emqx_bridge:send_message(BridgeId, Params), ok end), + % Wait for reconnection. + ?retry( + _Sleep = 1_000, + _Attempts = 30, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), ?retry( _Sleep = 1_000, _Attempts = 30, @@ -529,3 +548,32 @@ t_no_sid_nor_service_name(Config0) -> create_bridge(Config) ), ok. + +t_table_removed(Config) -> + ResourceId = resource_id(Config), + ?check_trace( + begin + ?assertMatch({ok, _}, create_bridge_api(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + drop_table_if_exists(Config), + MsgId = erlang:unique_integer(), + Params = #{ + topic => ?config(mqtt_topic, Config), + id => MsgId, + payload => ?config(oracle_name, Config), + retain => true + }, + Message = {send_message, Params}, + ?assertEqual( + {error, {unrecoverable_error, {942, "ORA-00942: table or view does not exist\n"}}}, + emqx_resource:simple_sync_query(ResourceId, Message) + ), + ok + end, + [] + ), + ok. diff --git a/apps/emqx_oracle/src/emqx_oracle.app.src b/apps/emqx_oracle/src/emqx_oracle.app.src index 3beda05a4..10dbe7990 100644 --- a/apps/emqx_oracle/src/emqx_oracle.app.src +++ b/apps/emqx_oracle/src/emqx_oracle.app.src @@ -1,6 +1,6 @@ {application, emqx_oracle, [ {description, "EMQX Enterprise Oracle Database Connector"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_oracle/src/emqx_oracle.erl b/apps/emqx_oracle/src/emqx_oracle.erl index c5d8ecc77..5bdd6fc07 100644 --- a/apps/emqx_oracle/src/emqx_oracle.erl +++ b/apps/emqx_oracle/src/emqx_oracle.erl @@ -346,6 +346,10 @@ to_bin(Bin) when is_binary(Bin) -> to_bin(Atom) when is_atom(Atom) -> erlang:atom_to_binary(Atom). +handle_result({error, {recoverable_error, _Error}} = Res) -> + Res; +handle_result({error, {unrecoverable_error, _Error}} = Res) -> + Res; handle_result({error, disconnected}) -> {error, {recoverable_error, disconnected}}; handle_result({error, Error}) -> @@ -354,6 +358,8 @@ handle_result({error, socket, closed} = Error) -> {error, {recoverable_error, Error}}; handle_result({error, Type, Reason}) -> {error, {unrecoverable_error, {Type, Reason}}}; +handle_result({ok, [{proc_result, RetCode, Reason}]}) when RetCode =/= 0 -> + {error, {unrecoverable_error, {RetCode, Reason}}}; handle_result(Res) -> Res.