851 lines
28 KiB
Erlang
851 lines
28 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
-module(emqx_postgresql).
|
|
|
|
-include("emqx_postgresql.hrl").
|
|
-include_lib("emqx_connector/include/emqx_connector.hrl").
|
|
-include_lib("typerefl/include/types.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
-include_lib("hocon/include/hoconsc.hrl").
|
|
-include_lib("epgsql/include/epgsql.hrl").
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
-export([roots/0, fields/1, namespace/0]).
|
|
|
|
-behaviour(emqx_resource).
|
|
|
|
%% callbacks of behaviour emqx_resource
|
|
-export([
|
|
resource_type/0,
|
|
callback_mode/0,
|
|
on_start/2,
|
|
on_stop/2,
|
|
on_query/3,
|
|
on_batch_query/3,
|
|
on_get_status/2,
|
|
on_add_channel/4,
|
|
on_remove_channel/3,
|
|
on_get_channels/1,
|
|
on_get_channel_status/3,
|
|
on_format_query_result/1
|
|
]).
|
|
|
|
-export([connect/1]).
|
|
|
|
-export([
|
|
query/3,
|
|
prepared_query/3,
|
|
execute_batch/3
|
|
]).
|
|
|
|
-export([disable_prepared_statements/0]).
|
|
|
|
%% for ecpool workers usage
|
|
-export([do_get_status/1, prepare_sql_to_conn/2]).
|
|
|
|
-define(PGSQL_HOST_OPTIONS, #{
|
|
default_port => ?PGSQL_DEFAULT_PORT
|
|
}).
|
|
|
|
-type connector_resource_id() :: binary().
|
|
-type action_resource_id() :: binary().
|
|
|
|
-type template() :: {unicode:chardata(), emqx_template_sql:row_template()}.
|
|
-type state() ::
|
|
#{
|
|
pool_name := binary(),
|
|
query_templates := #{binary() => template()},
|
|
prepares := disabled | #{binary() => epgsql:statement()} | {error, _}
|
|
}.
|
|
|
|
%% FIXME: add `{error, sync_required}' to `epgsql:execute_batch'
|
|
%% We want to be able to call sync if any message from the backend leaves the driver in an
|
|
%% inconsistent state needing sync.
|
|
-dialyzer({nowarn_function, [execute_batch/3]}).
|
|
|
|
%%=====================================================================
|
|
|
|
namespace() -> postgres.
|
|
|
|
roots() ->
|
|
[{config, #{type => hoconsc:ref(?MODULE, config)}}].
|
|
|
|
fields(config) ->
|
|
[
|
|
{server, server()},
|
|
disable_prepared_statements()
|
|
] ++
|
|
adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++
|
|
emqx_connector_schema_lib:ssl_fields() ++
|
|
emqx_connector_schema_lib:prepare_statement_fields().
|
|
|
|
server() ->
|
|
Meta = #{desc => ?DESC("server")},
|
|
emqx_schema:servers_sc(Meta, ?PGSQL_HOST_OPTIONS).
|
|
|
|
disable_prepared_statements() ->
|
|
{disable_prepared_statements,
|
|
hoconsc:mk(
|
|
boolean(),
|
|
#{
|
|
default => false,
|
|
required => false,
|
|
desc => ?DESC("disable_prepared_statements")
|
|
}
|
|
)}.
|
|
|
|
adjust_fields(Fields) ->
|
|
lists:map(
|
|
fun
|
|
({username, Sc}) ->
|
|
%% to please dialyzer...
|
|
Override = #{type => hocon_schema:field_schema(Sc, type), required => true},
|
|
{username, hocon_schema:override(Sc, Override)};
|
|
(Field) ->
|
|
Field
|
|
end,
|
|
Fields
|
|
).
|
|
|
|
%% ===================================================================
|
|
resource_type() -> pgsql.
|
|
|
|
callback_mode() -> always_sync.
|
|
|
|
-spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}.
|
|
on_start(
|
|
InstId,
|
|
#{
|
|
server := Server,
|
|
disable_prepared_statements := DisablePreparedStatements,
|
|
database := DB,
|
|
username := User,
|
|
pool_size := PoolSize,
|
|
ssl := SSL
|
|
} = Config
|
|
) ->
|
|
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?PGSQL_HOST_OPTIONS),
|
|
?SLOG(info, #{
|
|
msg => "starting_postgresql_connector",
|
|
connector => InstId,
|
|
config => emqx_utils:redact(Config)
|
|
}),
|
|
SslOpts =
|
|
case maps:get(enable, SSL) of
|
|
true ->
|
|
[
|
|
%% note: this is converted to `required' in
|
|
%% `conn_opts/2', and there's a boolean guard
|
|
%% there; if this is set to `required' here,
|
|
%% that'll require changing `conn_opts/2''s guard.
|
|
{ssl, true},
|
|
{ssl_opts, emqx_tls_lib:to_client_opts(SSL)}
|
|
];
|
|
false ->
|
|
[{ssl, false}]
|
|
end,
|
|
Options = [
|
|
{host, Host},
|
|
{port, Port},
|
|
{username, User},
|
|
{password, maps:get(password, Config, emqx_secret:wrap(""))},
|
|
{database, DB},
|
|
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
|
|
{pool_size, PoolSize}
|
|
],
|
|
State1 = parse_sql_template(Config, <<"send_message">>),
|
|
State2 = State1#{installed_channels => #{}},
|
|
case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
|
|
ok ->
|
|
Prepares =
|
|
case DisablePreparedStatements of
|
|
true -> disabled;
|
|
false -> #{}
|
|
end,
|
|
{ok, init_prepare(State2#{pool_name => InstId, prepares => Prepares})};
|
|
{error, Reason} ->
|
|
?tp(
|
|
pgsql_connector_start_failed,
|
|
#{error => Reason}
|
|
),
|
|
{error, Reason}
|
|
end.
|
|
|
|
on_stop(InstId, State) ->
|
|
?SLOG(info, #{
|
|
msg => "stopping_postgresql_connector",
|
|
connector => InstId
|
|
}),
|
|
close_connections(State),
|
|
Res = emqx_resource_pool:stop(InstId),
|
|
?tp(postgres_stopped, #{instance_id => InstId}),
|
|
Res.
|
|
|
|
close_connections(#{pool_name := PoolName} = _State) ->
|
|
WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
|
|
close_connections_with_worker_pids(WorkerPids),
|
|
ok.
|
|
|
|
close_connections_with_worker_pids([WorkerPid | Rest]) ->
|
|
%% We ignore errors since any error probably means that the
|
|
%% connection is closed already.
|
|
try ecpool_worker:client(WorkerPid) of
|
|
{ok, Conn} ->
|
|
_ = epgsql:close(Conn),
|
|
close_connections_with_worker_pids(Rest);
|
|
_ ->
|
|
close_connections_with_worker_pids(Rest)
|
|
catch
|
|
_:_ ->
|
|
close_connections_with_worker_pids(Rest)
|
|
end;
|
|
close_connections_with_worker_pids([]) ->
|
|
ok.
|
|
|
|
on_add_channel(
|
|
_InstId,
|
|
#{
|
|
installed_channels := InstalledChannels
|
|
} = OldState,
|
|
ChannelId,
|
|
ChannelConfig
|
|
) ->
|
|
%% The following will throw an exception if the bridge producers fails to start
|
|
{ok, ChannelState} = create_channel_state(ChannelId, OldState, ChannelConfig),
|
|
case ChannelState of
|
|
#{prepares := {error, Reason}} ->
|
|
{error, {unhealthy_target, Reason}};
|
|
_ ->
|
|
NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
|
|
%% Update state
|
|
NewState = OldState#{installed_channels => NewInstalledChannels},
|
|
{ok, NewState}
|
|
end.
|
|
|
|
create_channel_state(
|
|
ChannelId,
|
|
#{
|
|
pool_name := PoolName,
|
|
prepares := Prepares
|
|
} = _ConnectorState,
|
|
#{parameters := Parameters} = _ChannelConfig
|
|
) ->
|
|
State1 = parse_sql_template(Parameters, ChannelId),
|
|
{ok,
|
|
init_prepare(State1#{
|
|
pool_name => PoolName,
|
|
prepares => Prepares,
|
|
prepare_statement => #{}
|
|
})}.
|
|
|
|
on_remove_channel(
|
|
_InstId,
|
|
#{
|
|
installed_channels := InstalledChannels
|
|
} = OldState,
|
|
ChannelId
|
|
) ->
|
|
%% Close prepared statements
|
|
ok = close_prepared_statement(ChannelId, OldState),
|
|
NewInstalledChannels = maps:remove(ChannelId, InstalledChannels),
|
|
%% Update state
|
|
NewState = OldState#{installed_channels => NewInstalledChannels},
|
|
{ok, NewState}.
|
|
|
|
close_prepared_statement(_ChannelId, #{prepares := disabled}) ->
|
|
ok;
|
|
close_prepared_statement(ChannelId, #{pool_name := PoolName} = State) ->
|
|
WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
|
|
close_prepared_statement(WorkerPids, ChannelId, State),
|
|
ok.
|
|
|
|
close_prepared_statement([WorkerPid | Rest], ChannelId, State) ->
|
|
%% We ignore errors since any error probably means that the
|
|
%% prepared statement doesn't exist. If it exists when we try
|
|
%% to insert one with the same name, we will try to remove it
|
|
%% again anyway.
|
|
try ecpool_worker:client(WorkerPid) of
|
|
{ok, Conn} ->
|
|
Statement = get_templated_statement(ChannelId, State),
|
|
_ = epgsql:close(Conn, Statement),
|
|
close_prepared_statement(Rest, ChannelId, State);
|
|
_ ->
|
|
close_prepared_statement(Rest, ChannelId, State)
|
|
catch
|
|
_:_ ->
|
|
close_prepared_statement(Rest, ChannelId, State)
|
|
end;
|
|
close_prepared_statement([], _ChannelId, _State) ->
|
|
ok.
|
|
|
|
on_get_channel_status(
|
|
_ResId,
|
|
ChannelId,
|
|
#{
|
|
pool_name := PoolName,
|
|
installed_channels := Channels
|
|
} = _State
|
|
) ->
|
|
ChannelState = maps:get(ChannelId, Channels),
|
|
case
|
|
do_check_channel_sql(
|
|
PoolName,
|
|
ChannelId,
|
|
ChannelState
|
|
)
|
|
of
|
|
ok ->
|
|
connected;
|
|
{error, undefined_table} ->
|
|
{error, {unhealthy_target, <<"Table does not exist">>}}
|
|
end.
|
|
|
|
do_check_channel_sql(
|
|
PoolName,
|
|
ChannelId,
|
|
#{query_templates := ChannelQueryTemplates} = _ChannelState
|
|
) ->
|
|
{SQL, _RowTemplate} = maps:get(ChannelId, ChannelQueryTemplates),
|
|
WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
|
|
validate_table_existence(WorkerPids, SQL).
|
|
|
|
on_get_channels(ResId) ->
|
|
emqx_bridge_v2:get_channels_for_connector(ResId).
|
|
|
|
-spec on_query
|
|
%% Called from authn and authz modules
|
|
(connector_resource_id(), {prepared_query, binary(), [term()]}, state()) ->
|
|
{ok, _} | {error, term()};
|
|
%% Called from bridges
|
|
(connector_resource_id(), {action_resource_id(), map()}, state()) ->
|
|
{ok, _} | {error, term()}.
|
|
on_query(InstId, {TypeOrKey, NameOrMap}, State) ->
|
|
on_query(InstId, {TypeOrKey, NameOrMap, []}, State);
|
|
on_query(
|
|
InstId,
|
|
{TypeOrKey, NameOrMap, Params},
|
|
#{pool_name := PoolName} = State
|
|
) ->
|
|
?SLOG(debug, #{
|
|
msg => "postgresql_connector_received_sql_query",
|
|
connector => InstId,
|
|
type => TypeOrKey,
|
|
sql => NameOrMap,
|
|
state => State
|
|
}),
|
|
{QueryType, NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrMap, Params, State),
|
|
emqx_trace:rendered_action_template(
|
|
TypeOrKey,
|
|
#{
|
|
statement_type => QueryType,
|
|
statement_or_name => NameOrSQL2,
|
|
data => Data
|
|
}
|
|
),
|
|
Res = on_sql_query(InstId, PoolName, QueryType, NameOrSQL2, Data),
|
|
?tp(postgres_bridge_connector_on_query_return, #{instance_id => InstId, result => Res}),
|
|
handle_result(Res).
|
|
|
|
on_batch_query(
|
|
InstId,
|
|
[{Key, _} = Request | _] = BatchReq,
|
|
#{pool_name := PoolName} = State
|
|
) ->
|
|
BinKey = to_bin(Key),
|
|
case get_template(BinKey, State) of
|
|
undefined ->
|
|
Log = #{
|
|
connector => InstId,
|
|
first_request => Request,
|
|
state => State,
|
|
msg => "batch prepare not implemented"
|
|
},
|
|
?SLOG(error, Log),
|
|
{error, {unrecoverable_error, batch_prepare_not_implemented}};
|
|
{_Statement, RowTemplate} ->
|
|
StatementTemplate = get_templated_statement(BinKey, State),
|
|
Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq],
|
|
emqx_trace:rendered_action_template(
|
|
Key,
|
|
#{
|
|
statement_type => execute_batch,
|
|
statement_or_name => StatementTemplate,
|
|
data => Rows
|
|
}
|
|
),
|
|
case on_sql_query(InstId, PoolName, execute_batch, StatementTemplate, Rows) of
|
|
{error, _Error} = Result ->
|
|
handle_result(Result);
|
|
{_Column, Results} ->
|
|
handle_batch_result(Results, 0)
|
|
end
|
|
end;
|
|
on_batch_query(InstId, BatchReq, State) ->
|
|
?SLOG(error, #{
|
|
connector => InstId,
|
|
request => BatchReq,
|
|
state => State,
|
|
msg => "invalid request"
|
|
}),
|
|
{error, {unrecoverable_error, invalid_request}}.
|
|
|
|
proc_sql_params(ActionResId, #{} = Map, [], State) when is_binary(ActionResId) ->
|
|
%% When this connector is called from actions/bridges.
|
|
DisablePreparedStatements = prepared_statements_disabled(State),
|
|
{ExprTemplate, RowTemplate} = get_template(ActionResId, State),
|
|
Rendered = render_prepare_sql_row(RowTemplate, Map),
|
|
case DisablePreparedStatements of
|
|
true ->
|
|
{query, ExprTemplate, Rendered};
|
|
false ->
|
|
{prepared_query, ActionResId, Rendered}
|
|
end;
|
|
proc_sql_params(prepared_query, ConnResId, Params, State) ->
|
|
%% When this connector is called from authn/authz modules
|
|
DisablePreparedStatements = prepared_statements_disabled(State),
|
|
case DisablePreparedStatements of
|
|
true ->
|
|
#{query_templates := #{ConnResId := {ExprTemplate, _VarsTemplate}}} = State,
|
|
{query, ExprTemplate, Params};
|
|
false ->
|
|
%% Connector resource id itself is the prepared statement name
|
|
{prepared_query, ConnResId, Params}
|
|
end;
|
|
proc_sql_params(QueryType, SQL, Params, _State) when
|
|
is_atom(QueryType) andalso
|
|
(is_binary(SQL) orelse is_list(SQL)) andalso
|
|
is_list(Params)
|
|
->
|
|
%% When called to do ad-hoc commands/queries.
|
|
{QueryType, SQL, Params}.
|
|
|
|
prepared_statements_disabled(State) ->
|
|
maps:get(prepares, State, #{}) =:= disabled.
|
|
|
|
get_template(Key, #{installed_channels := Channels} = _State) when is_map_key(Key, Channels) ->
|
|
BinKey = to_bin(Key),
|
|
ChannelState = maps:get(BinKey, Channels),
|
|
ChannelQueryTemplates = maps:get(query_templates, ChannelState),
|
|
maps:get(BinKey, ChannelQueryTemplates);
|
|
get_template(Key, #{query_templates := Templates}) ->
|
|
BinKey = to_bin(Key),
|
|
maps:get(BinKey, Templates, undefined).
|
|
|
|
get_templated_statement(Key, #{installed_channels := Channels} = _State) when
|
|
is_map_key(Key, Channels)
|
|
->
|
|
BinKey = to_bin(Key),
|
|
ChannelState = maps:get(BinKey, Channels),
|
|
case ChannelState of
|
|
#{prepares := disabled, query_templates := #{BinKey := {ExprTemplate, _}}} ->
|
|
ExprTemplate;
|
|
#{prepares := #{BinKey := ExprTemplate}} ->
|
|
ExprTemplate
|
|
end;
|
|
get_templated_statement(Key, #{prepares := PrepStatements}) ->
|
|
BinKey = to_bin(Key),
|
|
maps:get(BinKey, PrepStatements).
|
|
|
|
on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
|
|
try ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover) of
|
|
{error, Reason} ->
|
|
?tp(
|
|
pgsql_connector_query_return,
|
|
#{error => Reason}
|
|
),
|
|
TranslatedError = translate_to_log_context(Reason),
|
|
?SLOG(
|
|
error,
|
|
maps:merge(
|
|
#{
|
|
msg => "postgresql_connector_do_sql_query_failed",
|
|
connector => InstId,
|
|
type => Type,
|
|
sql => NameOrSQL
|
|
},
|
|
TranslatedError
|
|
)
|
|
),
|
|
case Reason of
|
|
sync_required ->
|
|
{error, {recoverable_error, Reason}};
|
|
ecpool_empty ->
|
|
{error, {recoverable_error, Reason}};
|
|
{error, error, _, undefined_table, _, _} ->
|
|
{error, {unrecoverable_error, export_error(TranslatedError)}};
|
|
_ ->
|
|
{error, export_error(TranslatedError)}
|
|
end;
|
|
Result ->
|
|
?tp(
|
|
pgsql_connector_query_return,
|
|
#{result => Result}
|
|
),
|
|
Result
|
|
catch
|
|
error:function_clause:Stacktrace ->
|
|
?SLOG(error, #{
|
|
msg => "postgresql_connector_do_sql_query_failed",
|
|
connector => InstId,
|
|
type => Type,
|
|
sql => NameOrSQL,
|
|
reason => function_clause,
|
|
stacktrace => Stacktrace
|
|
}),
|
|
{error, {unrecoverable_error, invalid_request}}
|
|
end.
|
|
|
|
on_get_status(_InstId, #{pool_name := PoolName} = State) ->
|
|
case emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1) of
|
|
true ->
|
|
case do_check_prepares(State) of
|
|
ok ->
|
|
connected;
|
|
{ok, NState} ->
|
|
%% return new state with prepared statements
|
|
{connected, NState};
|
|
{error, undefined_table} ->
|
|
%% return new state indicating that we are connected but the target table is not created
|
|
{disconnected, State, unhealthy_target};
|
|
{error, _Reason} ->
|
|
%% do not log error, it is logged in prepare_sql_to_conn
|
|
connecting
|
|
end;
|
|
false ->
|
|
connecting
|
|
end.
|
|
|
|
do_get_status(Conn) ->
|
|
ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")).
|
|
|
|
do_check_prepares(
|
|
#{
|
|
pool_name := PoolName,
|
|
query_templates := #{<<"send_message">> := {SQL, _RowTemplate}}
|
|
}
|
|
) ->
|
|
WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
|
|
case validate_table_existence(WorkerPids, SQL) of
|
|
ok ->
|
|
ok;
|
|
{error, Reason} ->
|
|
{error, Reason}
|
|
end;
|
|
do_check_prepares(#{prepares := disabled}) ->
|
|
ok;
|
|
do_check_prepares(#{prepares := Prepares}) when is_map(Prepares) ->
|
|
ok;
|
|
do_check_prepares(#{prepares := {error, _}} = State) ->
|
|
%% retry to prepare
|
|
case prepare_sql(State) of
|
|
{ok, PrepStatements} ->
|
|
%% remove the error
|
|
{ok, State#{prepares := PrepStatements}};
|
|
{error, Reason} ->
|
|
{error, Reason}
|
|
end.
|
|
|
|
-spec validate_table_existence([pid()], binary()) -> ok | {error, undefined_table}.
|
|
validate_table_existence([WorkerPid | Rest], SQL) ->
|
|
try ecpool_worker:client(WorkerPid) of
|
|
{ok, Conn} ->
|
|
case epgsql:parse2(Conn, "", SQL, []) of
|
|
{error, {_, _, _, undefined_table, _, _}} ->
|
|
{error, undefined_table};
|
|
Res when is_tuple(Res) andalso ok == element(1, Res) ->
|
|
ok;
|
|
Res ->
|
|
?tp(postgres_connector_bad_parse2, #{result => Res}),
|
|
validate_table_existence(Rest, SQL)
|
|
end;
|
|
_ ->
|
|
validate_table_existence(Rest, SQL)
|
|
catch
|
|
exit:{noproc, _} ->
|
|
validate_table_existence(Rest, SQL)
|
|
end;
|
|
validate_table_existence([], _SQL) ->
|
|
%% All workers either replied an unexpected error; we will retry
|
|
%% on the next health check.
|
|
ok.
|
|
|
|
%% ===================================================================
|
|
|
|
connect(Opts) ->
|
|
Host = proplists:get_value(host, Opts),
|
|
Username = proplists:get_value(username, Opts),
|
|
%% TODO: teach `epgsql` to accept 0-arity closures as passwords.
|
|
Password = emqx_secret:unwrap(proplists:get_value(password, Opts)),
|
|
case epgsql:connect(Host, Username, Password, conn_opts(Opts)) of
|
|
{ok, _Conn} = Ok ->
|
|
Ok;
|
|
{error, Reason} ->
|
|
{error, Reason}
|
|
end.
|
|
|
|
query(Conn, SQL, Params) ->
|
|
case epgsql:equery(Conn, SQL, Params) of
|
|
{error, sync_required} = Res ->
|
|
ok = epgsql:sync(Conn),
|
|
Res;
|
|
Res ->
|
|
Res
|
|
end.
|
|
|
|
prepared_query(Conn, Name, Params) ->
|
|
case epgsql:prepared_query2(Conn, Name, Params) of
|
|
{error, sync_required} = Res ->
|
|
ok = epgsql:sync(Conn),
|
|
Res;
|
|
Res ->
|
|
Res
|
|
end.
|
|
|
|
execute_batch(Conn, Statement, Params) ->
|
|
case epgsql:execute_batch(Conn, Statement, Params) of
|
|
{error, sync_required} = Res ->
|
|
ok = epgsql:sync(Conn),
|
|
Res;
|
|
Res ->
|
|
Res
|
|
end.
|
|
|
|
conn_opts(Opts) ->
|
|
conn_opts(Opts, []).
|
|
conn_opts([], Acc) ->
|
|
Acc;
|
|
conn_opts([Opt = {database, _} | Opts], Acc) ->
|
|
conn_opts(Opts, [Opt | Acc]);
|
|
conn_opts([{ssl, Bool} | Opts], Acc) when is_boolean(Bool) ->
|
|
Flag =
|
|
case Bool of
|
|
true -> required;
|
|
false -> false
|
|
end,
|
|
conn_opts(Opts, [{ssl, Flag} | Acc]);
|
|
conn_opts([Opt = {port, _} | Opts], Acc) ->
|
|
conn_opts(Opts, [Opt | Acc]);
|
|
conn_opts([Opt = {timeout, _} | Opts], Acc) ->
|
|
conn_opts(Opts, [Opt | Acc]);
|
|
conn_opts([Opt = {ssl_opts, _} | Opts], Acc) ->
|
|
conn_opts(Opts, [Opt | Acc]);
|
|
conn_opts([_Opt | Opts], Acc) ->
|
|
conn_opts(Opts, Acc).
|
|
|
|
parse_sql_template(Config, SQLID) ->
|
|
Queries =
|
|
case Config of
|
|
#{prepare_statement := Qs} ->
|
|
Qs;
|
|
#{sql := Query} ->
|
|
#{SQLID => Query};
|
|
#{} ->
|
|
#{}
|
|
end,
|
|
Templates = maps:fold(fun parse_sql_template/3, #{}, Queries),
|
|
#{query_templates => Templates}.
|
|
|
|
parse_sql_template(Key, Query, Acc) ->
|
|
Template = emqx_template_sql:parse_prepstmt(Query, #{parameters => '$n'}),
|
|
Acc#{Key => Template}.
|
|
|
|
render_prepare_sql_row(RowTemplate, Data) ->
|
|
% NOTE: ignoring errors here, missing variables will be replaced with `null`.
|
|
{Row, _Errors} = emqx_template_sql:render_prepstmt(RowTemplate, {emqx_jsonish, Data}),
|
|
Row.
|
|
|
|
init_prepare(State = #{prepares := disabled}) ->
|
|
State;
|
|
init_prepare(State = #{query_templates := Templates}) when map_size(Templates) == 0 ->
|
|
State;
|
|
init_prepare(State = #{}) ->
|
|
case prepare_sql(State) of
|
|
{ok, PrepStatements} ->
|
|
State#{prepares => PrepStatements};
|
|
Error ->
|
|
TranslatedError = translate_to_log_context(Error),
|
|
?SLOG(
|
|
error,
|
|
maps:merge(
|
|
#{msg => <<"postgresql_init_prepare_statement_failed">>},
|
|
TranslatedError
|
|
)
|
|
),
|
|
%% mark the prepares failed
|
|
State#{prepares => {error, export_error(TranslatedError)}}
|
|
end.
|
|
|
|
prepare_sql(#{query_templates := Templates, pool_name := PoolName}) ->
|
|
prepare_sql(maps:to_list(Templates), PoolName).
|
|
|
|
prepare_sql(Templates, PoolName) ->
|
|
case do_prepare_sql(Templates, PoolName) of
|
|
{ok, _Sts} = Ok ->
|
|
%% prepare for reconnect
|
|
ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Templates]}),
|
|
Ok;
|
|
Error ->
|
|
Error
|
|
end.
|
|
|
|
do_prepare_sql(Templates, PoolName) ->
|
|
do_prepare_sql(ecpool:workers(PoolName), Templates, #{}).
|
|
|
|
do_prepare_sql([{_Name, Worker} | Rest], Templates, _LastSts) ->
|
|
{ok, Conn} = ecpool_worker:client(Worker),
|
|
case prepare_sql_to_conn(Conn, Templates) of
|
|
{ok, Sts} ->
|
|
do_prepare_sql(Rest, Templates, Sts);
|
|
Error ->
|
|
Error
|
|
end;
|
|
do_prepare_sql([], _Prepares, LastSts) ->
|
|
{ok, LastSts}.
|
|
|
|
prepare_sql_to_conn(Conn, Prepares) ->
|
|
prepare_sql_to_conn(Conn, Prepares, #{}, 0).
|
|
|
|
prepare_sql_to_conn(Conn, [], Statements, _Attempts) when is_pid(Conn) ->
|
|
{ok, Statements};
|
|
prepare_sql_to_conn(Conn, [{_Key, _} | _Rest], _Statements, _MaxAttempts = 2) when is_pid(Conn) ->
|
|
failed_to_remove_prev_prepared_statement_error();
|
|
prepare_sql_to_conn(
|
|
Conn, [{Key, {SQL, _RowTemplate}} | Rest] = ToPrepare, Statements, Attempts
|
|
) when is_pid(Conn) ->
|
|
LogMeta = #{msg => "postgresql_prepare_statement", name => Key, sql => SQL},
|
|
?SLOG(info, LogMeta),
|
|
case epgsql:parse2(Conn, Key, SQL, []) of
|
|
{ok, Statement} ->
|
|
prepare_sql_to_conn(Conn, Rest, Statements#{Key => Statement}, 0);
|
|
{error, #error{severity = error, codename = undefined_table} = Error} ->
|
|
%% Target table is not created
|
|
?tp(pgsql_undefined_table, #{}),
|
|
LogMsg =
|
|
maps:merge(
|
|
LogMeta#{msg => "postgresql_parse_failed"},
|
|
translate_to_log_context(Error)
|
|
),
|
|
?SLOG(error, LogMsg),
|
|
{error, undefined_table};
|
|
{error, #error{severity = error, codename = duplicate_prepared_statement}} = Error ->
|
|
?tp(pgsql_prepared_statement_exists, #{}),
|
|
LogMsg =
|
|
maps:merge(
|
|
LogMeta#{
|
|
msg => "postgresql_prepared_statment_with_same_name_already_exists",
|
|
explain => <<
|
|
"A prepared statement with the same name already "
|
|
"exists in the driver. Will attempt to remove the "
|
|
"previous prepared statement with the name and then "
|
|
"try again."
|
|
>>
|
|
},
|
|
translate_to_log_context(Error)
|
|
),
|
|
?SLOG(warning, LogMsg),
|
|
case epgsql:close(Conn, statement, Key) of
|
|
ok ->
|
|
?SLOG(info, #{msg => "pqsql_closed_statement_successfully"}),
|
|
prepare_sql_to_conn(Conn, ToPrepare, Statements, Attempts + 1);
|
|
{error, CloseError} ->
|
|
?SLOG(error, #{msg => "pqsql_close_statement_failed", cause => CloseError}),
|
|
failed_to_remove_prev_prepared_statement_error()
|
|
end;
|
|
{error, Error} ->
|
|
TranslatedError = translate_to_log_context(Error),
|
|
LogMsg =
|
|
maps:merge(
|
|
LogMeta#{msg => "postgresql_parse_failed"},
|
|
TranslatedError
|
|
),
|
|
?SLOG(error, LogMsg),
|
|
{error, export_error(TranslatedError)}
|
|
end.
|
|
|
|
failed_to_remove_prev_prepared_statement_error() ->
|
|
Msg =
|
|
("A previous prepared statement for the action already exists "
|
|
"but cannot be closed. Please, try to disable and then enable "
|
|
"the connector to resolve this issue."),
|
|
{error, unicode:characters_to_binary(Msg)}.
|
|
|
|
to_bin(Bin) when is_binary(Bin) ->
|
|
Bin;
|
|
to_bin(Atom) when is_atom(Atom) ->
|
|
erlang:atom_to_binary(Atom).
|
|
|
|
handle_result({error, {recoverable_error, _Error}} = Res) ->
|
|
Res;
|
|
handle_result({error, {unrecoverable_error, _Error}} = Res) ->
|
|
Res;
|
|
handle_result({error, disconnected}) ->
|
|
{error, {recoverable_error, disconnected}};
|
|
handle_result({error, Error}) ->
|
|
TranslatedError = translate_to_log_context(Error),
|
|
{error, {unrecoverable_error, export_error(TranslatedError)}};
|
|
handle_result(Res) ->
|
|
Res.
|
|
|
|
on_format_query_result({ok, Cnt}) when is_integer(Cnt) ->
|
|
#{result => ok, affected_rows => Cnt};
|
|
on_format_query_result(Res) ->
|
|
Res.
|
|
|
|
handle_batch_result([{ok, Count} | Rest], Acc) ->
|
|
handle_batch_result(Rest, Acc + Count);
|
|
handle_batch_result([{error, Error} | _Rest], _Acc) ->
|
|
TranslatedError = translate_to_log_context(Error),
|
|
{error, {unrecoverable_error, export_error(TranslatedError)}};
|
|
handle_batch_result([], Acc) ->
|
|
?tp("postgres_success_batch_result", #{row_count => Acc}),
|
|
{ok, Acc}.
|
|
|
|
translate_to_log_context({error, Reason}) ->
|
|
translate_to_log_context(Reason);
|
|
translate_to_log_context(#error{} = Reason) ->
|
|
#error{
|
|
severity = Severity,
|
|
code = Code,
|
|
codename = Codename,
|
|
message = Message,
|
|
extra = Extra
|
|
} = Reason,
|
|
#{
|
|
driver_severity => Severity,
|
|
driver_error_codename => Codename,
|
|
driver_error_code => Code,
|
|
driver_error_message => emqx_logger_textfmt:try_format_unicode(Message),
|
|
driver_error_extra => Extra
|
|
};
|
|
translate_to_log_context(Reason) ->
|
|
#{reason => Reason}.
|
|
|
|
export_error(#{
|
|
driver_severity := Severity,
|
|
driver_error_codename := Codename,
|
|
driver_error_code := Code
|
|
}) ->
|
|
%% Extra information has already been logged.
|
|
#{
|
|
error_code => Code,
|
|
error_codename => Codename,
|
|
severity => Severity
|
|
};
|
|
export_error(#{reason := Reason}) ->
|
|
Reason;
|
|
export_error(Error) ->
|
|
Error.
|