emqx/apps/emqx_oracle/src/emqx_oracle.erl

580 lines
19 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_oracle).
-behaviour(emqx_resource).
-include_lib("emqx_resource/include/emqx_resource.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_trace.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(UNHEALTHY_TARGET_MSG,
"Oracle table is invalid. Please check if the table exists in Oracle Database."
).
%%====================================================================
%% Exports
%%====================================================================
%% callbacks for behaviour emqx_resource
-export([
resource_type/0,
callback_mode/0,
on_start/2,
on_stop/2,
on_query/3,
on_batch_query/3,
on_get_status/2,
on_add_channel/4,
on_remove_channel/3,
on_get_channels/1,
on_get_channel_status/3
]).
%% callbacks for ecpool
-export([connect/1, prepare_sql_to_conn/3]).
%% Internal exports used to execute code with ecpool worker
-export([
query/3,
execute_batch/3,
do_get_status/1
]).
-export([
oracle_host_options/0
]).
-define(ORACLE_DEFAULT_PORT, 1521).
-define(SYNC_QUERY_MODE, no_handover).
-define(DEFAULT_POOL_SIZE, 8).
-define(OPT_TIMEOUT, 30000).
-define(MAX_CURSORS, 10).
-define(ORACLE_HOST_OPTIONS, #{
default_port => ?ORACLE_DEFAULT_PORT
}).
-type prepares() :: #{atom() => binary()}.
-type params_tokens() :: #{atom() => list()}.
-type state() ::
#{
pool_name := binary(),
prepare_sql := prepares(),
params_tokens := params_tokens(),
batch_params_tokens := params_tokens()
}.
resource_type() -> oracle.
% As ecpool is not monitoring the worker's PID when doing a handover_async, the
% request can be lost if worker crashes. Thus, it's better to force requests to
% be sync for now.
callback_mode() -> always_sync.
-spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}.
on_start(
InstId,
#{
server := Server,
username := User
} = Config
) ->
?SLOG(info, #{
msg => "starting_oracle_connector",
connector => InstId,
config => emqx_utils:redact(Config)
}),
?tp(oracle_bridge_started, #{instance_id => InstId, config => Config}),
{ok, _} = application:ensure_all_started(ecpool),
{ok, _} = application:ensure_all_started(jamdb_oracle),
jamdb_oracle_conn:set_max_cursors_number(?MAX_CURSORS),
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, oracle_host_options()),
Sid = maps:get(sid, Config, ""),
ServiceName =
case maps:get(service_name, Config, undefined) of
undefined -> undefined;
ServiceName0 -> emqx_utils_conv:str(ServiceName0)
end,
Options = [
{host, Host},
{port, Port},
{user, emqx_utils_conv:str(User)},
{password, maps:get(password, Config, "")},
{sid, emqx_utils_conv:str(Sid)},
{service_name, ServiceName},
{pool_size, maps:get(pool_size, Config, ?DEFAULT_POOL_SIZE)},
{timeout, ?OPT_TIMEOUT},
{app_name, "EMQX Data To Oracle Database Action"}
],
PoolName = InstId,
State = #{
pool_name => PoolName,
installed_channels => #{}
},
case emqx_resource_pool:start(InstId, ?MODULE, Options) of
ok ->
{ok, State};
{error, Reason} ->
?tp(
oracle_connector_start_failed,
#{error => Reason}
),
{error, Reason}
end.
on_stop(InstId, #{pool_name := PoolName}) ->
?SLOG(info, #{
msg => "stopping_oracle_connector",
connector => InstId
}),
?tp(oracle_bridge_stopped, #{instance_id => InstId}),
emqx_resource_pool:stop(PoolName).
on_add_channel(
_InstId,
#{
installed_channels := InstalledChannels,
pool_name := PoolName
} = OldState,
ChannelId,
ChannelConfig
) ->
{ok, ChannelState} = create_channel_state(ChannelId, PoolName, ChannelConfig),
NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
%% Update state
NewState = OldState#{installed_channels => NewInstalledChannels},
{ok, NewState}.
create_channel_state(
ChannelId,
PoolName,
#{parameters := Conf} = _ChannelConfig
) ->
State0 = parse_prepare_sql(ChannelId, Conf),
State1 = init_prepare(PoolName, State0),
{ok, State1}.
on_remove_channel(
_InstId,
#{
installed_channels := InstalledChannels
} = OldState,
ChannelId
) ->
NewInstalledChannels = maps:remove(ChannelId, InstalledChannels),
%% Update state
NewState = OldState#{installed_channels => NewInstalledChannels},
{ok, NewState}.
on_get_channel_status(
_ResId,
ChannelId,
#{
pool_name := PoolName,
installed_channels := Channels
} = _State
) ->
State = maps:get(ChannelId, Channels),
case do_check_prepares(ChannelId, PoolName, State) of
ok ->
?status_connected;
{error, undefined_table} ->
%% return new state indicating that we are connected but the target table is not created
{?status_disconnected, {unhealthy_target, ?UNHEALTHY_TARGET_MSG}};
{error, _Reason} ->
%% do not log error, it is logged in prepare_sql_to_conn
?status_connecting
end.
on_get_channels(ResId) ->
emqx_bridge_v2:get_channels_for_connector(ResId).
on_query(InstId, {TypeOrKey, NameOrSQL}, #{pool_name := _PoolName} = State) ->
on_query(InstId, {TypeOrKey, NameOrSQL, []}, State);
on_query(
InstId,
{TypeOrKey, NameOrSQL, Params},
#{
pool_name := PoolName,
installed_channels := Channels
} = _ConnectorState
) ->
State = maps:get(TypeOrKey, Channels, #{}),
?SLOG(debug, #{
msg => "oracle_connector_received_sql_query",
connector => InstId,
type => TypeOrKey,
sql => NameOrSQL,
state => State
}),
Type = query,
{NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State),
Res = on_sql_query(InstId, TypeOrKey, PoolName, Type, ?SYNC_QUERY_MODE, NameOrSQL2, Data),
handle_result(Res).
on_batch_query(
InstId,
BatchReq,
#{
pool_name := PoolName,
installed_channels := Channels
} = ConnectorState
) ->
case BatchReq of
[{Key, _} = Request | _] ->
BinKey = to_bin(Key),
State = maps:get(BinKey, Channels),
#{
params_tokens := Tokens,
prepare_sql := Sts
} = State,
case maps:get(BinKey, Tokens, undefined) of
undefined ->
Log = #{
connector => InstId,
first_request => Request,
state => State,
msg => "batch_prepare_not_implemented"
},
?SLOG(error, Log),
{error, {unrecoverable_error, batch_prepare_not_implemented}};
TokenList ->
{_, Datas} = lists:unzip(BatchReq),
Datas2 = [emqx_placeholder:proc_sql(TokenList, Data) || Data <- Datas],
St = maps:get(BinKey, Sts),
case
on_sql_query(
InstId, BinKey, PoolName, execute_batch, ?SYNC_QUERY_MODE, St, Datas2
)
of
{ok, Results} ->
handle_batch_result(Results, 0);
Result ->
Result
end
end;
_ ->
Log = #{
connector => InstId,
request => BatchReq,
state => ConnectorState,
msg => "invalid_request"
},
?SLOG(error, Log),
{error, {unrecoverable_error, invalid_request}}
end.
proc_sql_params(query, SQLOrKey, Params, _State) ->
{SQLOrKey, Params};
proc_sql_params(TypeOrKey, SQLOrData, Params, #{
params_tokens := ParamsTokens, prepare_sql := PrepareSql
}) ->
Key = to_bin(TypeOrKey),
case maps:get(Key, ParamsTokens, undefined) of
undefined ->
{SQLOrData, Params};
Tokens ->
case maps:get(Key, PrepareSql, undefined) of
undefined ->
{SQLOrData, Params};
Sql ->
{Sql, emqx_placeholder:proc_sql(Tokens, SQLOrData)}
end
end.
on_sql_query(InstId, ChannelID, PoolName, Type, ApplyMode, NameOrSQL, Data) ->
emqx_trace:rendered_action_template(ChannelID, #{
type => Type,
apply_mode => ApplyMode,
name_or_sql => NameOrSQL,
data => #emqx_trace_format_func_data{function = fun trace_format_data/1, data = Data}
}),
case ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, ApplyMode) of
{error, Reason} = Result ->
?tp(
oracle_connector_query_return,
#{error => Reason}
),
?SLOG(error, #{
msg => "oracle_connector_do_sql_query_failed",
connector => InstId,
type => Type,
sql => NameOrSQL,
reason => Reason
}),
case Reason of
ecpool_empty ->
{error, {recoverable_error, Reason}};
_ ->
Result
end;
Result ->
?tp(
oracle_connector_query_return,
#{result => Result}
),
Result
end.
trace_format_data(Data0) ->
%% In batch request, we get a two level list
{'$array$', lists:map(fun insert_array_marker_if_list/1, Data0)}.
insert_array_marker_if_list(List) when is_list(List) ->
{'$array$', List};
insert_array_marker_if_list(Item) ->
Item.
on_get_status(_InstId, #{pool_name := Pool} = _State) ->
case emqx_resource_pool:health_check_workers(Pool, fun ?MODULE:do_get_status/1) of
true ->
?status_connected;
false ->
?status_disconnected
end.
do_get_status(Conn) ->
ok == element(1, jamdb_oracle:sql_query(Conn, "select 1 from dual")).
do_check_prepares(
_ChannelId,
_PoolName,
#{
prepare_sql := {error, _Prepares}
} = _State
) ->
{error, undefined_table};
do_check_prepares(
ChannelId,
PoolName,
State
) ->
#{
prepare_sql := #{ChannelId := SQL},
params_tokens := #{ChannelId := Tokens}
} = State,
% it's already connected. Verify if target table still exists
Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
lists:foldl(
fun
(WorkerPid, ok) ->
case ecpool_worker:client(WorkerPid) of
{ok, Conn} ->
case check_if_table_exists(Conn, SQL, Tokens) of
{error, undefined_table} -> {error, undefined_table};
_ -> ok
end;
_ ->
ok
end;
(_, Acc) ->
Acc
end,
ok,
Workers
).
%% ===================================================================
oracle_host_options() ->
?ORACLE_HOST_OPTIONS.
connect(Opts) ->
jamdb_oracle:start_link(Opts).
sql_query_to_str(SqlQuery) ->
emqx_utils_conv:str(SqlQuery).
sql_params_to_str(Params) when is_list(Params) ->
lists:map(
fun
(false) -> "0";
(true) -> "1";
(Value) -> emqx_utils_conv:str(Value)
end,
Params
).
query(Conn, SQL, Params) ->
Ret = jamdb_oracle:sql_query(Conn, {sql_query_to_str(SQL), sql_params_to_str(Params)}),
?tp(oracle_query, #{conn => Conn, sql => SQL, params => Params, result => Ret}),
handle_result(Ret).
execute_batch(Conn, SQL, ParamsList) ->
ParamsListStr = lists:map(fun sql_params_to_str/1, ParamsList),
Ret = jamdb_oracle:sql_query(Conn, {batch, sql_query_to_str(SQL), ParamsListStr}),
?tp(oracle_batch_query, #{conn => Conn, sql => SQL, params => ParamsList, result => Ret}),
handle_result(Ret).
parse_prepare_sql(ChannelId, Config) ->
SQL =
case maps:get(prepare_statement, Config, undefined) of
undefined ->
case maps:get(sql, Config, undefined) of
undefined -> #{};
Template -> #{ChannelId => Template}
end;
Any ->
Any
end,
parse_prepare_sql(maps:to_list(SQL), #{}, #{}).
parse_prepare_sql([{Key, H} | T], Prepares, Tokens) ->
{PrepareSQL, ParamsTokens} = emqx_placeholder:preproc_sql(H, ':n'),
parse_prepare_sql(
T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}
);
parse_prepare_sql([], Prepares, Tokens) ->
#{
prepare_sql => Prepares,
params_tokens => Tokens
}.
init_prepare(PoolName, State = #{prepare_sql := Prepares, params_tokens := TokensMap}) ->
case prepare_sql(Prepares, PoolName, TokensMap) of
{ok, Sts} ->
State#{prepare_sql := Sts};
Error ->
LogMeta = #{
msg => <<"oracle_init_prepare_statement_failed">>, error => Error
},
?SLOG(error, LogMeta),
%% mark the prepare_sql as failed
State#{prepare_sql => {error, Prepares}}
end.
prepare_sql(Prepares, PoolName, TokensMap) when is_map(Prepares) ->
prepare_sql(maps:to_list(Prepares), PoolName, TokensMap);
prepare_sql(Prepares, PoolName, TokensMap) ->
case do_prepare_sql(Prepares, PoolName, TokensMap) of
{ok, _Sts} = Ok ->
%% prepare for reconnect
ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}),
Ok;
Error ->
Error
end.
do_prepare_sql(Prepares, PoolName, TokensMap) ->
do_prepare_sql(ecpool:workers(PoolName), Prepares, PoolName, TokensMap, #{}).
do_prepare_sql([{_Name, Worker} | T], Prepares, PoolName, TokensMap, _LastSts) ->
{ok, Conn} = ecpool_worker:client(Worker),
case prepare_sql_to_conn(Conn, Prepares, TokensMap) of
{ok, Sts} ->
do_prepare_sql(T, Prepares, PoolName, TokensMap, Sts);
Error ->
Error
end;
do_prepare_sql([], _Prepares, _PoolName, _TokensMap, LastSts) ->
{ok, LastSts}.
prepare_sql_to_conn(Conn, Prepares, TokensMap) ->
prepare_sql_to_conn(Conn, Prepares, TokensMap, #{}).
prepare_sql_to_conn(Conn, [], _TokensMap, Statements) when is_pid(Conn) -> {ok, Statements};
prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], TokensMap, Statements) when is_pid(Conn) ->
LogMeta = #{msg => "oracle_prepare_statement", name => Key, prepare_sql => SQL},
Tokens = maps:get(Key, TokensMap, []),
?SLOG(info, LogMeta),
case check_if_table_exists(Conn, SQL, Tokens) of
ok ->
?SLOG(info, LogMeta#{result => success}),
prepare_sql_to_conn(Conn, PrepareList, TokensMap, Statements#{Key => SQL});
{error, undefined_table} = Error ->
%% Target table is not created
?SLOG(error, LogMeta#{result => failed, reason => "table does not exist"}),
?tp(oracle_undefined_table, #{}),
Error;
Error ->
Error
end.
check_if_table_exists(Conn, SQL, Tokens0) ->
% Discard nested tokens for checking if table exist. As payload here is defined as
% a single string, it would fail if Token is, for instance, ${payload.msg}, causing
% bridge probe to fail.
Tokens = lists:map(
fun
({var, [Token | _DiscardedDeepTokens]}) ->
{var, [Token]};
(Token) ->
Token
end,
Tokens0
),
{Event, _Headers} = emqx_rule_events:eventmsg_publish(
emqx_message:make(<<"t/opic">>, "test query")
),
SqlQuery = "begin " ++ binary_to_list(SQL) ++ "; rollback; end;",
Params = emqx_placeholder:proc_sql(Tokens, Event),
case jamdb_oracle:sql_query(Conn, {SqlQuery, Params}) of
{ok, [{proc_result, 0, _Description}]} ->
ok;
{ok, [{proc_result, 942, _Description}]} ->
%% Target table is not created
{error, undefined_table};
{ok, [{proc_result, _, Description}]} ->
% only the last result is returned, so we need to check on description if it
% contains the "Table doesn't exist" error as it can not be the last one.
% (for instance, the ORA-06550 can be the result value when table does not exist)
ErrorCodes =
case re:run(Description, <<"(ORA-[0-9]+)">>, [global, {capture, first, binary}]) of
{match, OraCodes} -> OraCodes;
_ -> []
end,
OraMap = maps:from_keys([ErrorCode || [ErrorCode] <- ErrorCodes], true),
case OraMap of
_ when is_map_key(<<"ORA-00942">>, OraMap) ->
% ORA-00942: table or view does not exist
{error, undefined_table};
_ when is_map_key(<<"ORA-00932">>, OraMap) ->
% ORA-00932: inconsistent datatypes
% There is a some type inconsistency with table definition but
% table does exist. Probably this inconsistency was caused by
% token discarding in this test query.
ok;
_ ->
{error, Description}
end;
Reason ->
{error, Reason}
end.
to_bin(Bin) when is_binary(Bin) ->
Bin;
to_bin(Atom) when is_atom(Atom) ->
erlang:atom_to_binary(Atom).
handle_result({error, {recoverable_error, _Error}} = Res) ->
Res;
handle_result({error, {unrecoverable_error, _Error}} = Res) ->
Res;
handle_result({error, disconnected}) ->
{error, {recoverable_error, disconnected}};
handle_result({error, Error}) ->
{error, {unrecoverable_error, Error}};
handle_result({error, socket, closed} = Error) ->
{error, {recoverable_error, Error}};
handle_result({error, Type, Reason}) ->
{error, {unrecoverable_error, {Type, Reason}}};
handle_result({ok, [{proc_result, RetCode, Reason}]}) when RetCode =/= 0 ->
{error, {unrecoverable_error, {RetCode, Reason}}};
handle_result(Res) ->
Res.
handle_batch_result([{affected_rows, RowCount} | Rest], Acc) ->
handle_batch_result(Rest, Acc + RowCount);
handle_batch_result([{proc_result, RetCode, _Rows} | Rest], Acc) when RetCode =:= 0 ->
handle_batch_result(Rest, Acc);
handle_batch_result([{proc_result, RetCode, Reason} | _Rest], _Acc) ->
{error, {unrecoverable_error, {RetCode, Reason}}};
handle_batch_result([], Acc) ->
{ok, Acc}.