fix(oracle): fix error handling on sync query
Fixes https://emqx.atlassian.net/browse/EMQX-10075
This commit is contained in:
parent
364601c3aa
commit
fb455d68a3
|
@ -165,19 +165,32 @@ sql_create_table() ->
|
||||||
"CREATE TABLE mqtt_test (topic VARCHAR2(255), msgid VARCHAR2(64), payload NCLOB, retain NUMBER(1))".
|
"CREATE TABLE mqtt_test (topic VARCHAR2(255), msgid VARCHAR2(64), payload NCLOB, retain NUMBER(1))".
|
||||||
|
|
||||||
sql_drop_table() ->
|
sql_drop_table() ->
|
||||||
"DROP TABLE mqtt_test".
|
"BEGIN\n"
|
||||||
|
" EXECUTE IMMEDIATE 'DROP TABLE mqtt_test';\n"
|
||||||
|
" EXCEPTION\n"
|
||||||
|
" WHEN OTHERS THEN\n"
|
||||||
|
" IF SQLCODE = -942 THEN\n"
|
||||||
|
" NULL;\n"
|
||||||
|
" ELSE\n"
|
||||||
|
" RAISE;\n"
|
||||||
|
" END IF;\n"
|
||||||
|
" END;".
|
||||||
|
|
||||||
|
sql_check_table_exist() ->
|
||||||
|
"SELECT COUNT(*) FROM user_tables WHERE table_name = 'MQTT_TEST'".
|
||||||
|
|
||||||
reset_table(Config) ->
|
reset_table(Config) ->
|
||||||
ResourceId = resource_id(Config),
|
ResourceId = resource_id(Config),
|
||||||
_ = emqx_resource:simple_sync_query(ResourceId, {sql, sql_drop_table()}),
|
drop_table_if_exists(Config),
|
||||||
{ok, [{proc_result, 0, _}]} = emqx_resource:simple_sync_query(
|
{ok, [{proc_result, 0, _}]} = emqx_resource:simple_sync_query(
|
||||||
ResourceId, {sql, sql_create_table()}
|
ResourceId, {sql, sql_create_table()}
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
drop_table(Config) ->
|
drop_table_if_exists(Config) ->
|
||||||
ResourceId = resource_id(Config),
|
ResourceId = resource_id(Config),
|
||||||
emqx_resource:simple_sync_query(ResourceId, {query, sql_drop_table()}),
|
{ok, [{proc_result, 0, _}]} =
|
||||||
|
emqx_resource:simple_sync_query(ResourceId, {query, sql_drop_table()}),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
oracle_config(TestCase, _ConnectionType, Config) ->
|
oracle_config(TestCase, _ConnectionType, Config) ->
|
||||||
|
@ -394,6 +407,12 @@ t_batch_sync_query(Config) ->
|
||||||
emqx_bridge:send_message(BridgeId, Params),
|
emqx_bridge:send_message(BridgeId, Params),
|
||||||
ok
|
ok
|
||||||
end),
|
end),
|
||||||
|
% Wait for reconnection.
|
||||||
|
?retry(
|
||||||
|
_Sleep = 1_000,
|
||||||
|
_Attempts = 30,
|
||||||
|
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
|
||||||
|
),
|
||||||
?retry(
|
?retry(
|
||||||
_Sleep = 1_000,
|
_Sleep = 1_000,
|
||||||
_Attempts = 30,
|
_Attempts = 30,
|
||||||
|
@ -529,3 +548,32 @@ t_no_sid_nor_service_name(Config0) ->
|
||||||
create_bridge(Config)
|
create_bridge(Config)
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_table_removed(Config) ->
|
||||||
|
ResourceId = resource_id(Config),
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
?assertMatch({ok, _}, create_bridge_api(Config)),
|
||||||
|
?retry(
|
||||||
|
_Sleep = 1_000,
|
||||||
|
_Attempts = 20,
|
||||||
|
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
|
||||||
|
),
|
||||||
|
drop_table_if_exists(Config),
|
||||||
|
MsgId = erlang:unique_integer(),
|
||||||
|
Params = #{
|
||||||
|
topic => ?config(mqtt_topic, Config),
|
||||||
|
id => MsgId,
|
||||||
|
payload => ?config(oracle_name, Config),
|
||||||
|
retain => true
|
||||||
|
},
|
||||||
|
Message = {send_message, Params},
|
||||||
|
?assertEqual(
|
||||||
|
{error, {unrecoverable_error, {942, "ORA-00942: table or view does not exist\n"}}},
|
||||||
|
emqx_resource:simple_sync_query(ResourceId, Message)
|
||||||
|
),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
[]
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_oracle, [
|
{application, emqx_oracle, [
|
||||||
{description, "EMQX Enterprise Oracle Database Connector"},
|
{description, "EMQX Enterprise Oracle Database Connector"},
|
||||||
{vsn, "0.1.1"},
|
{vsn, "0.1.2"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [
|
{applications, [
|
||||||
kernel,
|
kernel,
|
||||||
|
|
|
@ -346,6 +346,10 @@ to_bin(Bin) when is_binary(Bin) ->
|
||||||
to_bin(Atom) when is_atom(Atom) ->
|
to_bin(Atom) when is_atom(Atom) ->
|
||||||
erlang:atom_to_binary(Atom).
|
erlang:atom_to_binary(Atom).
|
||||||
|
|
||||||
|
handle_result({error, {recoverable_error, _Error}} = Res) ->
|
||||||
|
Res;
|
||||||
|
handle_result({error, {unrecoverable_error, _Error}} = Res) ->
|
||||||
|
Res;
|
||||||
handle_result({error, disconnected}) ->
|
handle_result({error, disconnected}) ->
|
||||||
{error, {recoverable_error, disconnected}};
|
{error, {recoverable_error, disconnected}};
|
||||||
handle_result({error, Error}) ->
|
handle_result({error, Error}) ->
|
||||||
|
@ -354,6 +358,8 @@ handle_result({error, socket, closed} = Error) ->
|
||||||
{error, {recoverable_error, Error}};
|
{error, {recoverable_error, Error}};
|
||||||
handle_result({error, Type, Reason}) ->
|
handle_result({error, Type, Reason}) ->
|
||||||
{error, {unrecoverable_error, {Type, Reason}}};
|
{error, {unrecoverable_error, {Type, Reason}}};
|
||||||
|
handle_result({ok, [{proc_result, RetCode, Reason}]}) when RetCode =/= 0 ->
|
||||||
|
{error, {unrecoverable_error, {RetCode, Reason}}};
|
||||||
handle_result(Res) ->
|
handle_result(Res) ->
|
||||||
Res.
|
Res.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue