%%-------------------------------------------------------------------- %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- -module(emqx_oracle). -behaviour(emqx_resource). -include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_trace.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(UNHEALTHY_TARGET_MSG, "Oracle table is invalid. Please check if the table exists in Oracle Database." ). %%==================================================================== %% Exports %%==================================================================== %% callbacks for behaviour emqx_resource -export([ resource_type/0, callback_mode/0, on_start/2, on_stop/2, on_query/3, on_batch_query/3, on_get_status/2, on_add_channel/4, on_remove_channel/3, on_get_channels/1, on_get_channel_status/3 ]). %% callbacks for ecpool -export([connect/1, prepare_sql_to_conn/3]). %% Internal exports used to execute code with ecpool worker -export([ query/3, execute_batch/3, do_get_status/1 ]). -export([ oracle_host_options/0 ]). -define(ORACLE_DEFAULT_PORT, 1521). -define(SYNC_QUERY_MODE, no_handover). -define(DEFAULT_POOL_SIZE, 8). -define(OPT_TIMEOUT, 30000). -define(MAX_CURSORS, 10). -define(ORACLE_HOST_OPTIONS, #{ default_port => ?ORACLE_DEFAULT_PORT }). -type prepares() :: #{atom() => binary()}. -type params_tokens() :: #{atom() => list()}. -type state() :: #{ pool_name := binary(), prepare_sql := prepares(), params_tokens := params_tokens(), batch_params_tokens := params_tokens() }. resource_type() -> oracle. % As ecpool is not monitoring the worker's PID when doing a handover_async, the % request can be lost if worker crashes. Thus, it's better to force requests to % be sync for now. callback_mode() -> always_sync. -spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}. on_start( InstId, #{ server := Server, username := User } = Config ) -> ?SLOG(info, #{ msg => "starting_oracle_connector", connector => InstId, config => emqx_utils:redact(Config) }), ?tp(oracle_bridge_started, #{instance_id => InstId, config => Config}), {ok, _} = application:ensure_all_started(ecpool), {ok, _} = application:ensure_all_started(jamdb_oracle), jamdb_oracle_conn:set_max_cursors_number(?MAX_CURSORS), #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, oracle_host_options()), Sid = maps:get(sid, Config, ""), ServiceName = case maps:get(service_name, Config, undefined) of undefined -> undefined; ServiceName0 -> emqx_utils_conv:str(ServiceName0) end, Options = [ {host, Host}, {port, Port}, {user, emqx_utils_conv:str(User)}, {password, maps:get(password, Config, "")}, {sid, emqx_utils_conv:str(Sid)}, {service_name, ServiceName}, {pool_size, maps:get(pool_size, Config, ?DEFAULT_POOL_SIZE)}, {timeout, ?OPT_TIMEOUT}, {app_name, "EMQX Data To Oracle Database Action"} ], PoolName = InstId, State = #{ pool_name => PoolName, installed_channels => #{} }, case emqx_resource_pool:start(InstId, ?MODULE, Options) of ok -> {ok, State}; {error, Reason} -> ?tp( oracle_connector_start_failed, #{error => Reason} ), {error, Reason} end. on_stop(InstId, #{pool_name := PoolName}) -> ?SLOG(info, #{ msg => "stopping_oracle_connector", connector => InstId }), ?tp(oracle_bridge_stopped, #{instance_id => InstId}), emqx_resource_pool:stop(PoolName). on_add_channel( _InstId, #{ installed_channels := InstalledChannels, pool_name := PoolName } = OldState, ChannelId, ChannelConfig ) -> {ok, ChannelState} = create_channel_state(ChannelId, PoolName, ChannelConfig), NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels), %% Update state NewState = OldState#{installed_channels => NewInstalledChannels}, {ok, NewState}. create_channel_state( ChannelId, PoolName, #{parameters := Conf} = _ChannelConfig ) -> State0 = parse_prepare_sql(ChannelId, Conf), State1 = init_prepare(PoolName, State0), {ok, State1}. on_remove_channel( _InstId, #{ installed_channels := InstalledChannels } = OldState, ChannelId ) -> NewInstalledChannels = maps:remove(ChannelId, InstalledChannels), %% Update state NewState = OldState#{installed_channels => NewInstalledChannels}, {ok, NewState}. on_get_channel_status( _ResId, ChannelId, #{ pool_name := PoolName, installed_channels := Channels } = _State ) -> State = maps:get(ChannelId, Channels), case do_check_prepares(ChannelId, PoolName, State) of ok -> ?status_connected; {error, undefined_table} -> %% return new state indicating that we are connected but the target table is not created {?status_disconnected, {unhealthy_target, ?UNHEALTHY_TARGET_MSG}}; {error, _Reason} -> %% do not log error, it is logged in prepare_sql_to_conn ?status_connecting end. on_get_channels(ResId) -> emqx_bridge_v2:get_channels_for_connector(ResId). on_query(InstId, {TypeOrKey, NameOrSQL}, #{pool_name := _PoolName} = State) -> on_query(InstId, {TypeOrKey, NameOrSQL, []}, State); on_query( InstId, {TypeOrKey, NameOrSQL, Params}, #{ pool_name := PoolName, installed_channels := Channels } = _ConnectorState ) -> State = maps:get(TypeOrKey, Channels, #{}), ?SLOG(debug, #{ msg => "oracle_connector_received_sql_query", connector => InstId, type => TypeOrKey, sql => NameOrSQL, state => State }), Type = query, {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State), Res = on_sql_query(InstId, TypeOrKey, PoolName, Type, ?SYNC_QUERY_MODE, NameOrSQL2, Data), handle_result(Res). on_batch_query( InstId, BatchReq, #{ pool_name := PoolName, installed_channels := Channels } = ConnectorState ) -> case BatchReq of [{Key, _} = Request | _] -> BinKey = to_bin(Key), State = maps:get(BinKey, Channels), #{ params_tokens := Tokens, prepare_sql := Sts } = State, case maps:get(BinKey, Tokens, undefined) of undefined -> Log = #{ connector => InstId, first_request => Request, state => State, msg => "batch_prepare_not_implemented" }, ?SLOG(error, Log), {error, {unrecoverable_error, batch_prepare_not_implemented}}; TokenList -> {_, Datas} = lists:unzip(BatchReq), Datas2 = [emqx_placeholder:proc_sql(TokenList, Data) || Data <- Datas], St = maps:get(BinKey, Sts), case on_sql_query( InstId, BinKey, PoolName, execute_batch, ?SYNC_QUERY_MODE, St, Datas2 ) of {ok, Results} -> handle_batch_result(Results, 0); Result -> Result end end; _ -> Log = #{ connector => InstId, request => BatchReq, state => ConnectorState, msg => "invalid_request" }, ?SLOG(error, Log), {error, {unrecoverable_error, invalid_request}} end. proc_sql_params(query, SQLOrKey, Params, _State) -> {SQLOrKey, Params}; proc_sql_params(TypeOrKey, SQLOrData, Params, #{ params_tokens := ParamsTokens, prepare_sql := PrepareSql }) -> Key = to_bin(TypeOrKey), case maps:get(Key, ParamsTokens, undefined) of undefined -> {SQLOrData, Params}; Tokens -> case maps:get(Key, PrepareSql, undefined) of undefined -> {SQLOrData, Params}; Sql -> {Sql, emqx_placeholder:proc_sql(Tokens, SQLOrData)} end end. on_sql_query(InstId, ChannelID, PoolName, Type, ApplyMode, NameOrSQL, Data) -> emqx_trace:rendered_action_template(ChannelID, #{ type => Type, apply_mode => ApplyMode, name_or_sql => NameOrSQL, data => #emqx_trace_format_func_data{function = fun trace_format_data/1, data = Data} }), case ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, ApplyMode) of {error, Reason} = Result -> ?tp( oracle_connector_query_return, #{error => Reason} ), ?SLOG(error, #{ msg => "oracle_connector_do_sql_query_failed", connector => InstId, type => Type, sql => NameOrSQL, reason => Reason }), case Reason of ecpool_empty -> {error, {recoverable_error, Reason}}; _ -> Result end; Result -> ?tp( oracle_connector_query_return, #{result => Result} ), Result end. trace_format_data(Data0) -> %% In batch request, we get a two level list {'$array$', lists:map(fun insert_array_marker_if_list/1, Data0)}. insert_array_marker_if_list(List) when is_list(List) -> {'$array$', List}; insert_array_marker_if_list(Item) -> Item. on_get_status(_InstId, #{pool_name := Pool} = _State) -> case emqx_resource_pool:health_check_workers(Pool, fun ?MODULE:do_get_status/1) of true -> ?status_connected; false -> ?status_disconnected end. do_get_status(Conn) -> ok == element(1, jamdb_oracle:sql_query(Conn, "select 1 from dual")). do_check_prepares( _ChannelId, _PoolName, #{ prepare_sql := {error, _Prepares} } = _State ) -> {error, undefined_table}; do_check_prepares( ChannelId, PoolName, State ) -> #{ prepare_sql := #{ChannelId := SQL}, params_tokens := #{ChannelId := Tokens} } = State, % it's already connected. Verify if target table still exists Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], lists:foldl( fun (WorkerPid, ok) -> case ecpool_worker:client(WorkerPid) of {ok, Conn} -> case check_if_table_exists(Conn, SQL, Tokens) of {error, undefined_table} -> {error, undefined_table}; _ -> ok end; _ -> ok end; (_, Acc) -> Acc end, ok, Workers ). %% =================================================================== oracle_host_options() -> ?ORACLE_HOST_OPTIONS. connect(Opts) -> jamdb_oracle:start_link(Opts). sql_query_to_str(SqlQuery) -> emqx_utils_conv:str(SqlQuery). sql_params_to_str(Params) when is_list(Params) -> lists:map( fun (false) -> "0"; (true) -> "1"; (Value) -> emqx_utils_conv:str(Value) end, Params ). query(Conn, SQL, Params) -> Ret = jamdb_oracle:sql_query(Conn, {sql_query_to_str(SQL), sql_params_to_str(Params)}), ?tp(oracle_query, #{conn => Conn, sql => SQL, params => Params, result => Ret}), handle_result(Ret). execute_batch(Conn, SQL, ParamsList) -> ParamsListStr = lists:map(fun sql_params_to_str/1, ParamsList), Ret = jamdb_oracle:sql_query(Conn, {batch, sql_query_to_str(SQL), ParamsListStr}), ?tp(oracle_batch_query, #{conn => Conn, sql => SQL, params => ParamsList, result => Ret}), handle_result(Ret). parse_prepare_sql(ChannelId, Config) -> SQL = case maps:get(prepare_statement, Config, undefined) of undefined -> case maps:get(sql, Config, undefined) of undefined -> #{}; Template -> #{ChannelId => Template} end; Any -> Any end, parse_prepare_sql(maps:to_list(SQL), #{}, #{}). parse_prepare_sql([{Key, H} | T], Prepares, Tokens) -> {PrepareSQL, ParamsTokens} = emqx_placeholder:preproc_sql(H, ':n'), parse_prepare_sql( T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens} ); parse_prepare_sql([], Prepares, Tokens) -> #{ prepare_sql => Prepares, params_tokens => Tokens }. init_prepare(PoolName, State = #{prepare_sql := Prepares, params_tokens := TokensMap}) -> case prepare_sql(Prepares, PoolName, TokensMap) of {ok, Sts} -> State#{prepare_sql := Sts}; Error -> LogMeta = #{ msg => <<"oracle_init_prepare_statement_failed">>, error => Error }, ?SLOG(error, LogMeta), %% mark the prepare_sql as failed State#{prepare_sql => {error, Prepares}} end. prepare_sql(Prepares, PoolName, TokensMap) when is_map(Prepares) -> prepare_sql(maps:to_list(Prepares), PoolName, TokensMap); prepare_sql(Prepares, PoolName, TokensMap) -> case do_prepare_sql(Prepares, PoolName, TokensMap) of {ok, _Sts} = Ok -> %% prepare for reconnect ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}), Ok; Error -> Error end. do_prepare_sql(Prepares, PoolName, TokensMap) -> do_prepare_sql(ecpool:workers(PoolName), Prepares, PoolName, TokensMap, #{}). do_prepare_sql([{_Name, Worker} | T], Prepares, PoolName, TokensMap, _LastSts) -> {ok, Conn} = ecpool_worker:client(Worker), case prepare_sql_to_conn(Conn, Prepares, TokensMap) of {ok, Sts} -> do_prepare_sql(T, Prepares, PoolName, TokensMap, Sts); Error -> Error end; do_prepare_sql([], _Prepares, _PoolName, _TokensMap, LastSts) -> {ok, LastSts}. prepare_sql_to_conn(Conn, Prepares, TokensMap) -> prepare_sql_to_conn(Conn, Prepares, TokensMap, #{}). prepare_sql_to_conn(Conn, [], _TokensMap, Statements) when is_pid(Conn) -> {ok, Statements}; prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], TokensMap, Statements) when is_pid(Conn) -> LogMeta = #{msg => "oracle_prepare_statement", name => Key, prepare_sql => SQL}, Tokens = maps:get(Key, TokensMap, []), ?SLOG(info, LogMeta), case check_if_table_exists(Conn, SQL, Tokens) of ok -> ?SLOG(info, LogMeta#{result => success}), prepare_sql_to_conn(Conn, PrepareList, TokensMap, Statements#{Key => SQL}); {error, undefined_table} = Error -> %% Target table is not created ?SLOG(error, LogMeta#{result => failed, reason => "table does not exist"}), ?tp(oracle_undefined_table, #{}), Error; Error -> Error end. check_if_table_exists(Conn, SQL, Tokens0) -> % Discard nested tokens for checking if table exist. As payload here is defined as % a single string, it would fail if Token is, for instance, ${payload.msg}, causing % bridge probe to fail. Tokens = lists:map( fun ({var, [Token | _DiscardedDeepTokens]}) -> {var, [Token]}; (Token) -> Token end, Tokens0 ), {Event, _Headers} = emqx_rule_events:eventmsg_publish( emqx_message:make(<<"t/opic">>, "test query") ), SqlQuery = "begin " ++ binary_to_list(SQL) ++ "; rollback; end;", Params = emqx_placeholder:proc_sql(Tokens, Event), case jamdb_oracle:sql_query(Conn, {SqlQuery, Params}) of {ok, [{proc_result, 0, _Description}]} -> ok; {ok, [{proc_result, 942, _Description}]} -> %% Target table is not created {error, undefined_table}; {ok, [{proc_result, _, Description}]} -> % only the last result is returned, so we need to check on description if it % contains the "Table doesn't exist" error as it can not be the last one. % (for instance, the ORA-06550 can be the result value when table does not exist) ErrorCodes = case re:run(Description, <<"(ORA-[0-9]+)">>, [global, {capture, first, binary}]) of {match, OraCodes} -> OraCodes; _ -> [] end, OraMap = maps:from_keys([ErrorCode || [ErrorCode] <- ErrorCodes], true), case OraMap of _ when is_map_key(<<"ORA-00942">>, OraMap) -> % ORA-00942: table or view does not exist {error, undefined_table}; _ when is_map_key(<<"ORA-00932">>, OraMap) -> % ORA-00932: inconsistent datatypes % There is a some type inconsistency with table definition but % table does exist. Probably this inconsistency was caused by % token discarding in this test query. ok; _ -> {error, Description} end; Reason -> {error, Reason} end. to_bin(Bin) when is_binary(Bin) -> Bin; to_bin(Atom) when is_atom(Atom) -> erlang:atom_to_binary(Atom). handle_result({error, {recoverable_error, _Error}} = Res) -> Res; handle_result({error, {unrecoverable_error, _Error}} = Res) -> Res; handle_result({error, disconnected}) -> {error, {recoverable_error, disconnected}}; handle_result({error, Error}) -> {error, {unrecoverable_error, Error}}; handle_result({error, socket, closed} = Error) -> {error, {recoverable_error, Error}}; handle_result({error, Type, Reason}) -> {error, {unrecoverable_error, {Type, Reason}}}; handle_result({ok, [{proc_result, RetCode, Reason}]}) when RetCode =/= 0 -> {error, {unrecoverable_error, {RetCode, Reason}}}; handle_result(Res) -> Res. handle_batch_result([{affected_rows, RowCount} | Rest], Acc) -> handle_batch_result(Rest, Acc + RowCount); handle_batch_result([{proc_result, RetCode, _Rows} | Rest], Acc) when RetCode =:= 0 -> handle_batch_result(Rest, Acc); handle_batch_result([{proc_result, RetCode, Reason} | _Rest], _Acc) -> {error, {unrecoverable_error, {RetCode, Reason}}}; handle_batch_result([], Acc) -> {ok, Acc}.