From 5f1093609181b74a7b2223c24bfd10667498bb5f Mon Sep 17 00:00:00 2001 From: Paulo Zulato Date: Tue, 9 May 2023 19:49:12 -0300 Subject: [PATCH] feat(oracle): check whether target table exists Fixes https://emqx.atlassian.net/browse/EMQX-9026 --- .../test/emqx_bridge_oracle_SUITE.erl | 80 +++++++-- apps/emqx_oracle/src/emqx_oracle.erl | 155 +++++++++++++----- 2 files changed, 185 insertions(+), 50 deletions(-) diff --git a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl index 06b0256e2..d7c7cec74 100644 --- a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl +++ b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl @@ -179,18 +179,39 @@ sql_drop_table() -> sql_check_table_exist() -> "SELECT COUNT(*) FROM user_tables WHERE table_name = 'MQTT_TEST'". +new_jamdb_connection(Config) -> + JamdbOpts = [ + {host, ?config(oracle_host, Config)}, + {port, ?config(oracle_port, Config)}, + {user, "system"}, + {password, "oracle"}, + {sid, ?SID} + ], + jamdb_oracle:start(JamdbOpts). + +close_jamdb_connection(Conn) -> + jamdb_oracle:stop(Conn). + reset_table(Config) -> - ResourceId = resource_id(Config), - drop_table_if_exists(Config), - {ok, [{proc_result, 0, _}]} = emqx_resource:simple_sync_query( - ResourceId, {sql, sql_create_table()} - ), + {ok, Conn} = new_jamdb_connection(Config), + try + ok = drop_table_if_exists(Conn), + {ok, [{proc_result, 0, _}]} = jamdb_oracle:sql_query(Conn, sql_create_table()) + after + close_jamdb_connection(Conn) + end, ok. +drop_table_if_exists(Conn) when is_pid(Conn) -> + {ok, [{proc_result, 0, _}]} = jamdb_oracle:sql_query(Conn, sql_drop_table()), + ok; drop_table_if_exists(Config) -> - ResourceId = resource_id(Config), - {ok, [{proc_result, 0, _}]} = - emqx_resource:simple_sync_query(ResourceId, {query, sql_drop_table()}), + {ok, Conn} = new_jamdb_connection(Config), + try + ok = drop_table_if_exists(Conn) + after + close_jamdb_connection(Conn) + end, ok. oracle_config(TestCase, _ConnectionType, Config) -> @@ -216,7 +237,7 @@ oracle_config(TestCase, _ConnectionType, Config) -> " pool_size = 1\n" " sql = \"~s\"\n" " resource_opts = {\n" - " health_check_interval = \"5s\"\n" + " health_check_interval = \"15s\"\n" " request_ttl = \"30s\"\n" " query_mode = \"async\"\n" " batch_size = 3\n" @@ -349,13 +370,13 @@ t_sync_query(Config) -> ResourceId = resource_id(Config), ?check_trace( begin + reset_table(Config), ?assertMatch({ok, _}, create_bridge_api(Config)), ?retry( _Sleep = 1_000, _Attempts = 20, ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ), - reset_table(Config), MsgId = erlang:unique_integer(), Params = #{ topic => ?config(mqtt_topic, Config), @@ -381,13 +402,13 @@ t_batch_sync_query(Config) -> BridgeId = bridge_id(Config), ?check_trace( begin + reset_table(Config), ?assertMatch({ok, _}, create_bridge_api(Config)), ?retry( _Sleep = 1_000, _Attempts = 30, ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ), - reset_table(Config), MsgId = erlang:unique_integer(), Params = #{ topic => ?config(mqtt_topic, Config), @@ -464,6 +485,7 @@ t_start_stop(Config) -> ResourceId = resource_id(Config), ?check_trace( begin + reset_table(Config), ?assertMatch({ok, _}, create_bridge(Config)), %% Since the connection process is async, we give it some time to %% stabilize and avoid flakiness. @@ -515,6 +537,7 @@ t_on_get_status(Config) -> ProxyHost = ?config(proxy_host, Config), ProxyName = ?config(proxy_name, Config), ResourceId = resource_id(Config), + reset_table(Config), ?assertMatch({ok, _}, create_bridge(Config)), %% Since the connection process is async, we give it some time to %% stabilize and avoid flakiness. @@ -547,10 +570,45 @@ t_no_sid_nor_service_name(Config0) -> ), ok. +t_missing_table(Config) -> + ResourceId = resource_id(Config), + ?check_trace( + begin + drop_table_if_exists(Config), + ?assertMatch({ok, _}, create_bridge_api(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertMatch( + {ok, Status} when Status =:= disconnected orelse Status =:= connecting, + emqx_resource_manager:health_check(ResourceId) + ) + ), + MsgId = erlang:unique_integer(), + Params = #{ + topic => ?config(mqtt_topic, Config), + id => MsgId, + payload => ?config(oracle_name, Config), + retain => true + }, + Message = {send_message, Params}, + ?assertMatch( + {error, {resource_error, #{reason := not_connected}}}, + emqx_resource:simple_sync_query(ResourceId, Message) + ), + ok + end, + fun(Trace) -> + ?assertNotMatch([], ?of_kind(oracle_undefined_table, Trace)), + ok + end + ). + t_table_removed(Config) -> ResourceId = resource_id(Config), ?check_trace( begin + reset_table(Config), ?assertMatch({ok, _}, create_bridge_api(Config)), ?retry( _Sleep = 1_000, diff --git a/apps/emqx_oracle/src/emqx_oracle.erl b/apps/emqx_oracle/src/emqx_oracle.erl index 61952b96b..2d190057c 100644 --- a/apps/emqx_oracle/src/emqx_oracle.erl +++ b/apps/emqx_oracle/src/emqx_oracle.erl @@ -9,8 +9,6 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). --define(ORACLE_DEFAULT_PORT, 1521). - %%==================================================================== %% Exports %%==================================================================== @@ -26,7 +24,7 @@ ]). %% callbacks for ecpool --export([connect/1, prepare_sql_to_conn/2]). +-export([connect/1, prepare_sql_to_conn/3]). %% Internal exports used to execute code with ecpool worker -export([ @@ -39,18 +37,15 @@ oracle_host_options/0 ]). --define(ACTION_SEND_MESSAGE, send_message). - +-define(ORACLE_DEFAULT_PORT, 1521). -define(SYNC_QUERY_MODE, no_handover). - +-define(DEFAULT_POOL_SIZE, 8). +-define(OPT_TIMEOUT, 30000). +-define(MAX_CURSORS, 10). -define(ORACLE_HOST_OPTIONS, #{ default_port => ?ORACLE_DEFAULT_PORT }). --define(MAX_CURSORS, 10). --define(DEFAULT_POOL_SIZE, 8). --define(OPT_TIMEOUT, 30000). - -type prepares() :: #{atom() => binary()}. -type params_tokens() :: #{atom() => list()}. @@ -105,7 +100,7 @@ on_start( ], PoolName = InstId, Prepares = parse_prepare_sql(Config), - InitState = #{pool_name => PoolName, prepare_statement => #{}}, + InitState = #{pool_name => PoolName}, State = maps:merge(InitState, Prepares), case emqx_resource_pool:start(InstId, ?MODULE, Options) of ok -> @@ -148,7 +143,7 @@ on_query( on_batch_query( InstId, BatchReq, - #{pool_name := PoolName, params_tokens := Tokens, prepare_statement := Sts} = State + #{pool_name := PoolName, params_tokens := Tokens, prepare_sql := Sts} = State ) -> case BatchReq of [{Key, _} = Request | _] -> @@ -241,7 +236,13 @@ on_get_status(_InstId, #{pool_name := Pool} = State) -> connected; {ok, NState} -> %% return new state with prepared statements - {connected, NState} + {connected, NState}; + {error, _Reason} -> + %% do not log error, it is logged in prepare_sql_to_conn + connecting; + {undefined_table, NState} -> + %% return new state indicating that we are connected but the target table is not created + {disconnected, NState, unhealthy_target} end; false -> disconnected @@ -250,11 +251,42 @@ on_get_status(_InstId, #{pool_name := Pool} = State) -> do_get_status(Conn) -> ok == element(1, jamdb_oracle:sql_query(Conn, "select 1 from dual")). -do_check_prepares(#{prepare_sql := Prepares}) when is_map(Prepares) -> - ok; -do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepares}}) -> - {ok, Sts} = prepare_sql(Prepares, PoolName), - {ok, State#{prepare_sql => Prepares, prepare_statement := Sts}}. +do_check_prepares( + #{ + pool_name := PoolName, + prepare_sql := #{<<"send_message">> := SQL}, + params_tokens := #{<<"send_message">> := Tokens} + } = State +) -> + % it's already connected. Verify if target table still exists + Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], + lists:foldl( + fun + (WorkerPid, ok) -> + {ok, Conn} = ecpool_worker:client(WorkerPid), + case check_if_table_exists(Conn, SQL, Tokens) of + {error, undefined_table} -> {undefined_table, State}; + _ -> ok + end; + (_, Acc) -> + Acc + end, + ok, + Workers + ); +do_check_prepares( + State = #{pool_name := PoolName, prepare_sql := {error, Prepares}, params_tokens := TokensMap} +) -> + case prepare_sql(Prepares, PoolName, TokensMap) of + %% remove the error + {ok, Sts} -> + {ok, State#{prepare_sql => Sts}}; + {error, undefined_table} -> + %% indicate the error + {undefined_table, State#{prepare_sql => {error, Prepares}}}; + {error, _Reason} = Error -> + Error + end. %% =================================================================== @@ -312,36 +344,81 @@ parse_prepare_sql([], Prepares, Tokens) -> params_tokens => Tokens }. -init_prepare(State = #{prepare_sql := Prepares, pool_name := PoolName}) -> - {ok, Sts} = prepare_sql(Prepares, PoolName), - State#{prepare_statement := Sts}. +init_prepare(State = #{prepare_sql := Prepares, pool_name := PoolName, params_tokens := TokensMap}) -> + case prepare_sql(Prepares, PoolName, TokensMap) of + {ok, Sts} -> + State#{prepare_sql := Sts}; + Error -> + LogMeta = #{ + msg => <<"Oracle Database init prepare statement failed">>, error => Error + }, + ?SLOG(error, LogMeta), + %% mark the prepare_sql as failed + State#{prepare_sql => {error, Prepares}} + end. -prepare_sql(Prepares, PoolName) when is_map(Prepares) -> - prepare_sql(maps:to_list(Prepares), PoolName); -prepare_sql(Prepares, PoolName) -> - Data = do_prepare_sql(Prepares, PoolName), - {ok, _Sts} = Data, - ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}), - Data. +prepare_sql(Prepares, PoolName, TokensMap) when is_map(Prepares) -> + prepare_sql(maps:to_list(Prepares), PoolName, TokensMap); +prepare_sql(Prepares, PoolName, TokensMap) -> + case do_prepare_sql(Prepares, PoolName, TokensMap) of + {ok, _Sts} = Ok -> + %% prepare for reconnect + ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}), + Ok; + Error -> + Error + end. -do_prepare_sql(Prepares, PoolName) -> - do_prepare_sql(ecpool:workers(PoolName), Prepares, PoolName, #{}). +do_prepare_sql(Prepares, PoolName, TokensMap) -> + do_prepare_sql(ecpool:workers(PoolName), Prepares, PoolName, TokensMap, #{}). -do_prepare_sql([{_Name, Worker} | T], Prepares, PoolName, _LastSts) -> +do_prepare_sql([{_Name, Worker} | T], Prepares, PoolName, TokensMap, _LastSts) -> {ok, Conn} = ecpool_worker:client(Worker), - {ok, Sts} = prepare_sql_to_conn(Conn, Prepares), - do_prepare_sql(T, Prepares, PoolName, Sts); -do_prepare_sql([], _Prepares, _PoolName, LastSts) -> + case prepare_sql_to_conn(Conn, Prepares, TokensMap) of + {ok, Sts} -> + do_prepare_sql(T, Prepares, PoolName, TokensMap, Sts); + Error -> + Error + end; +do_prepare_sql([], _Prepares, _PoolName, _TokensMap, LastSts) -> {ok, LastSts}. -prepare_sql_to_conn(Conn, Prepares) -> - prepare_sql_to_conn(Conn, Prepares, #{}). +prepare_sql_to_conn(Conn, Prepares, TokensMap) -> + prepare_sql_to_conn(Conn, Prepares, TokensMap, #{}). -prepare_sql_to_conn(Conn, [], Statements) when is_pid(Conn) -> {ok, Statements}; -prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Conn) -> +prepare_sql_to_conn(Conn, [], _TokensMap, Statements) when is_pid(Conn) -> {ok, Statements}; +prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], TokensMap, Statements) when is_pid(Conn) -> LogMeta = #{msg => "Oracle Database Prepare Statement", name => Key, prepare_sql => SQL}, + Tokens = maps:get(Key, TokensMap, []), ?SLOG(info, LogMeta), - prepare_sql_to_conn(Conn, PrepareList, Statements#{Key => SQL}). + case check_if_table_exists(Conn, SQL, Tokens) of + ok -> + ?SLOG(info, LogMeta#{result => success}), + prepare_sql_to_conn(Conn, PrepareList, TokensMap, Statements#{Key => SQL}); + {error, undefined_table} = Error -> + %% Target table is not created + ?SLOG(error, LogMeta#{result => failed, reason => "table does not exist"}), + ?tp(oracle_undefined_table, #{}), + Error; + Error -> + Error + end. + +check_if_table_exists(Conn, SQL, Tokens) -> + {Event, _Headers} = emqx_rule_events:eventmsg_publish( + emqx_message:make(<<"t/opic">>, "test query") + ), + SqlQuery = "begin " ++ binary_to_list(SQL) ++ "; rollback; end;", + Params = emqx_placeholder:proc_sql(Tokens, Event), + case jamdb_oracle:sql_query(Conn, {SqlQuery, Params}) of + {ok, [{proc_result, 0, _Description}]} -> + ok; + {ok, [{proc_result, 6550, _Description}]} -> + %% Target table is not created + {error, undefined_table}; + Reason -> + {error, Reason} + end. to_bin(Bin) when is_binary(Bin) -> Bin;