Merge pull request #10645 from paulozulato/data-bridge-target-unavailable

Data bridge target unavailable
This commit is contained in:
Paulo Zulato 2023-06-21 18:19:23 -03:00 committed by GitHub
commit 62d3766726
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 517 additions and 83 deletions

View File

@ -1,6 +1,6 @@
%% -*- mode: erlang; -*- %% -*- mode: erlang; -*-
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.5"}}} {deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.6"}}}
, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}} , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}}
, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0"}}} , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0"}}}
, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}} , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}

View File

@ -73,6 +73,12 @@ on_start(InstId, Config) ->
sasl => emqx_bridge_kafka_impl:sasl(Auth), sasl => emqx_bridge_kafka_impl:sasl(Auth),
ssl => ssl(SSL) ssl => ssl(SSL)
}, },
case do_get_topic_status(Hosts, KafkaConfig, KafkaTopic) of
unhealthy_target ->
throw(unhealthy_target);
_ ->
ok
end,
case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of
{ok, _} -> {ok, _} ->
?SLOG(info, #{ ?SLOG(info, #{
@ -108,7 +114,9 @@ on_start(InstId, Config) ->
kafka_topic => KafkaTopic, kafka_topic => KafkaTopic,
producers => Producers, producers => Producers,
resource_id => ResourceId, resource_id => ResourceId,
sync_query_timeout => SyncQueryTimeout sync_query_timeout => SyncQueryTimeout,
hosts => Hosts,
kafka_config => KafkaConfig
}}; }};
{error, Reason2} -> {error, Reason2} ->
?SLOG(error, #{ ?SLOG(error, #{
@ -131,6 +139,7 @@ on_start(InstId, Config) ->
client_id => ClientId client_id => ClientId
} }
), ),
throw( throw(
"Failed to start Kafka client. Please check the logs for errors and check" "Failed to start Kafka client. Please check the logs for errors and check"
" the connection parameters." " the connection parameters."
@ -294,34 +303,60 @@ on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) ->
%% Note: since wolff client has its own replayq that is not managed by %% Note: since wolff client has its own replayq that is not managed by
%% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here. Otherwise, %% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here. Otherwise,
%% `emqx_resource_manager' will kill the wolff producers and messages might be lost. %% `emqx_resource_manager' will kill the wolff producers and messages might be lost.
on_get_status(_InstId, #{client_id := ClientId, kafka_topic := KafkaTopic}) -> on_get_status(_InstId, #{client_id := ClientId} = State) ->
case wolff_client_sup:find_client(ClientId) of case wolff_client_sup:find_client(ClientId) of
{ok, Pid} -> {ok, Pid} ->
do_get_status(Pid, KafkaTopic); case do_get_status(Pid, State) of
ok -> connected;
unhealthy_target -> {disconnected, State, unhealthy_target};
error -> connecting
end;
{error, _Reason} -> {error, _Reason} ->
connecting connecting
end. end.
do_get_status(Client, KafkaTopic) -> do_get_status(Client, #{kafka_topic := KafkaTopic, hosts := Hosts, kafka_config := KafkaConfig}) ->
%% TODO: add a wolff_producers:check_connectivity case do_get_topic_status(Hosts, KafkaConfig, KafkaTopic) of
unhealthy_target ->
unhealthy_target;
_ ->
case do_get_healthy_leaders(Client, KafkaTopic) of
[] -> error;
_ -> ok
end
end.
do_get_healthy_leaders(Client, KafkaTopic) ->
case wolff_client:get_leader_connections(Client, KafkaTopic) of case wolff_client:get_leader_connections(Client, KafkaTopic) of
{ok, Leaders} -> {ok, Leaders} ->
%% Kafka is considered healthy as long as any of the partition leader is reachable %% Kafka is considered healthy as long as any of the partition leader is reachable.
case lists:filtermap(
lists:any(
fun({_Partition, Pid}) -> fun({_Partition, Pid}) ->
is_pid(Pid) andalso erlang:is_process_alive(Pid) case is_pid(Pid) andalso erlang:is_process_alive(Pid) of
true -> {true, Pid};
_ -> false
end
end, end,
Leaders Leaders
) );
of
true ->
connected;
false ->
connecting
end;
{error, _} -> {error, _} ->
connecting []
end.
do_get_topic_status(Hosts, KafkaConfig, KafkaTopic) ->
CheckTopicFun =
fun() ->
wolff_client:check_if_topic_exists(Hosts, KafkaConfig, KafkaTopic)
end,
try
case emqx_utils:nolink_apply(CheckTopicFun, 5_000) of
ok -> ok;
{error, unknown_topic_or_partition} -> unhealthy_target;
_ -> error
end
catch
_:_ ->
error
end. end.
ssl(#{enable := true} = SSL) -> ssl(#{enable := true} = SSL) ->

View File

@ -472,6 +472,38 @@ t_failed_creation_then_fix(Config) ->
delete_all_bridges(), delete_all_bridges(),
ok. ok.
t_table_removed(_Config) ->
HostsString = kafka_hosts_string_sasl(),
AuthSettings = valid_sasl_plain_settings(),
Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]),
Type = ?BRIDGE_TYPE,
Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
ResourceId = emqx_bridge_resource:resource_id(Type, Name),
BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
KafkaTopic = "undefined-test-topic",
Conf = config(#{
"authentication" => AuthSettings,
"kafka_hosts_string" => HostsString,
"kafka_topic" => KafkaTopic,
"instance_id" => ResourceId,
"producer" => #{
"kafka" => #{
"buffer" => #{
"memory_overload_protection" => false
}
}
},
"ssl" => #{}
}),
{ok, #{config := ValidConfigAtom1}} = emqx_bridge:create(
Type, erlang:list_to_atom(Name), Conf
),
ValidConfigAtom = ValidConfigAtom1#{bridge_name => Name},
?assertThrow(_, ?PRODUCER:on_start(ResourceId, ValidConfigAtom)),
ok = emqx_bridge_resource:remove(BridgeId),
delete_all_bridges(),
ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Helper functions %% Helper functions
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------

View File

@ -179,18 +179,39 @@ sql_drop_table() ->
sql_check_table_exist() -> sql_check_table_exist() ->
"SELECT COUNT(*) FROM user_tables WHERE table_name = 'MQTT_TEST'". "SELECT COUNT(*) FROM user_tables WHERE table_name = 'MQTT_TEST'".
new_jamdb_connection(Config) ->
JamdbOpts = [
{host, ?config(oracle_host, Config)},
{port, ?config(oracle_port, Config)},
{user, "system"},
{password, "oracle"},
{sid, ?SID}
],
jamdb_oracle:start(JamdbOpts).
close_jamdb_connection(Conn) ->
jamdb_oracle:stop(Conn).
reset_table(Config) -> reset_table(Config) ->
ResourceId = resource_id(Config), {ok, Conn} = new_jamdb_connection(Config),
drop_table_if_exists(Config), try
{ok, [{proc_result, 0, _}]} = emqx_resource:simple_sync_query( ok = drop_table_if_exists(Conn),
ResourceId, {sql, sql_create_table()} {ok, [{proc_result, 0, _}]} = jamdb_oracle:sql_query(Conn, sql_create_table())
), after
close_jamdb_connection(Conn)
end,
ok. ok.
drop_table_if_exists(Conn) when is_pid(Conn) ->
{ok, [{proc_result, 0, _}]} = jamdb_oracle:sql_query(Conn, sql_drop_table()),
ok;
drop_table_if_exists(Config) -> drop_table_if_exists(Config) ->
ResourceId = resource_id(Config), {ok, Conn} = new_jamdb_connection(Config),
{ok, [{proc_result, 0, _}]} = try
emqx_resource:simple_sync_query(ResourceId, {query, sql_drop_table()}), ok = drop_table_if_exists(Conn)
after
close_jamdb_connection(Conn)
end,
ok. ok.
oracle_config(TestCase, _ConnectionType, Config) -> oracle_config(TestCase, _ConnectionType, Config) ->
@ -216,7 +237,7 @@ oracle_config(TestCase, _ConnectionType, Config) ->
" pool_size = 1\n" " pool_size = 1\n"
" sql = \"~s\"\n" " sql = \"~s\"\n"
" resource_opts = {\n" " resource_opts = {\n"
" health_check_interval = \"5s\"\n" " health_check_interval = \"15s\"\n"
" request_ttl = \"30s\"\n" " request_ttl = \"30s\"\n"
" query_mode = \"async\"\n" " query_mode = \"async\"\n"
" batch_size = 3\n" " batch_size = 3\n"
@ -349,13 +370,13 @@ t_sync_query(Config) ->
ResourceId = resource_id(Config), ResourceId = resource_id(Config),
?check_trace( ?check_trace(
begin begin
reset_table(Config),
?assertMatch({ok, _}, create_bridge_api(Config)), ?assertMatch({ok, _}, create_bridge_api(Config)),
?retry( ?retry(
_Sleep = 1_000, _Sleep = 1_000,
_Attempts = 20, _Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
), ),
reset_table(Config),
MsgId = erlang:unique_integer(), MsgId = erlang:unique_integer(),
Params = #{ Params = #{
topic => ?config(mqtt_topic, Config), topic => ?config(mqtt_topic, Config),
@ -381,13 +402,13 @@ t_batch_sync_query(Config) ->
BridgeId = bridge_id(Config), BridgeId = bridge_id(Config),
?check_trace( ?check_trace(
begin begin
reset_table(Config),
?assertMatch({ok, _}, create_bridge_api(Config)), ?assertMatch({ok, _}, create_bridge_api(Config)),
?retry( ?retry(
_Sleep = 1_000, _Sleep = 1_000,
_Attempts = 30, _Attempts = 30,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
), ),
reset_table(Config),
MsgId = erlang:unique_integer(), MsgId = erlang:unique_integer(),
Params = #{ Params = #{
topic => ?config(mqtt_topic, Config), topic => ?config(mqtt_topic, Config),
@ -464,6 +485,7 @@ t_start_stop(Config) ->
ResourceId = resource_id(Config), ResourceId = resource_id(Config),
?check_trace( ?check_trace(
begin begin
reset_table(Config),
?assertMatch({ok, _}, create_bridge(Config)), ?assertMatch({ok, _}, create_bridge(Config)),
%% Since the connection process is async, we give it some time to %% Since the connection process is async, we give it some time to
%% stabilize and avoid flakiness. %% stabilize and avoid flakiness.
@ -515,6 +537,7 @@ t_on_get_status(Config) ->
ProxyHost = ?config(proxy_host, Config), ProxyHost = ?config(proxy_host, Config),
ProxyName = ?config(proxy_name, Config), ProxyName = ?config(proxy_name, Config),
ResourceId = resource_id(Config), ResourceId = resource_id(Config),
reset_table(Config),
?assertMatch({ok, _}, create_bridge(Config)), ?assertMatch({ok, _}, create_bridge(Config)),
%% Since the connection process is async, we give it some time to %% Since the connection process is async, we give it some time to
%% stabilize and avoid flakiness. %% stabilize and avoid flakiness.
@ -547,10 +570,45 @@ t_no_sid_nor_service_name(Config0) ->
), ),
ok. ok.
t_missing_table(Config) ->
ResourceId = resource_id(Config),
?check_trace(
begin
drop_table_if_exists(Config),
?assertMatch({ok, _}, create_bridge_api(Config)),
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertMatch(
{ok, Status} when Status =:= disconnected orelse Status =:= connecting,
emqx_resource_manager:health_check(ResourceId)
)
),
MsgId = erlang:unique_integer(),
Params = #{
topic => ?config(mqtt_topic, Config),
id => MsgId,
payload => ?config(oracle_name, Config),
retain => true
},
Message = {send_message, Params},
?assertMatch(
{error, {resource_error, #{reason := not_connected}}},
emqx_resource:simple_sync_query(ResourceId, Message)
),
ok
end,
fun(Trace) ->
?assertNotMatch([], ?of_kind(oracle_undefined_table, Trace)),
ok
end
).
t_table_removed(Config) -> t_table_removed(Config) ->
ResourceId = resource_id(Config), ResourceId = resource_id(Config),
?check_trace( ?check_trace(
begin begin
reset_table(Config),
?assertMatch({ok, _}, create_bridge_api(Config)), ?assertMatch({ok, _}, create_bridge_api(Config)),
?retry( ?retry(
_Sleep = 1_000, _Sleep = 1_000,

View File

@ -257,6 +257,12 @@ query_resource(Config, Request) ->
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
emqx_resource:query(ResourceID, Request, #{timeout => 1_000}). emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
query_resource_sync(Config, Request) ->
Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
emqx_resource_buffer_worker:simple_sync_query(ResourceID, Request).
query_resource_async(Config, Request) -> query_resource_async(Config, Request) ->
query_resource_async(Config, Request, _Opts = #{}). query_resource_async(Config, Request, _Opts = #{}).
@ -634,3 +640,64 @@ t_nasty_sql_string(Config) ->
1_000 1_000
), ),
?assertEqual(Payload, connect_and_get_payload(Config)). ?assertEqual(Payload, connect_and_get_payload(Config)).
t_missing_table(Config) ->
Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
?check_trace(
begin
connect_and_drop_table(Config),
?assertMatch({ok, _}, create_bridge(Config)),
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertMatch(
{ok, Status} when Status == connecting orelse Status == disconnected,
emqx_resource_manager:health_check(ResourceID)
)
),
Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000},
Timeout = 1000,
?assertMatch(
{error, {resource_error, #{reason := unhealthy_target}}},
query_resource(Config, {send_message, SentData, [], Timeout})
),
ok
end,
fun(Trace) ->
?assertMatch([_, _, _], ?of_kind(pgsql_undefined_table, Trace)),
ok
end
),
connect_and_create_table(Config),
ok.
t_table_removed(Config) ->
Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
?check_trace(
begin
connect_and_create_table(Config),
?assertMatch({ok, _}, create_bridge(Config)),
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
),
connect_and_drop_table(Config),
Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000},
?assertMatch(
{error, {unrecoverable_error, {error, error, <<"42P01">>, undefined_table, _, _}}},
query_resource_sync(Config, {send_message, SentData, []})
),
ok
end,
[]
),
connect_and_create_table(Config),
ok.

View File

@ -222,6 +222,8 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) ->
{ok, NState} -> {ok, NState} ->
%% return new state with prepared statements %% return new state with prepared statements
{connected, NState}; {connected, NState};
{error, {undefined_table, NState}} ->
{disconnected, NState, unhealthy_target};
{error, _Reason} -> {error, _Reason} ->
%% do not log error, it is logged in prepare_sql_to_conn %% do not log error, it is logged in prepare_sql_to_conn
connecting connecting
@ -233,7 +235,37 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) ->
do_get_status(Conn) -> do_get_status(Conn) ->
ok == element(1, mysql:query(Conn, <<"SELECT count(1) AS T">>)). ok == element(1, mysql:query(Conn, <<"SELECT count(1) AS T">>)).
do_check_prepares(#{prepare_statement := Prepares}) when is_map(Prepares) -> do_check_prepares(
#{
pool_name := PoolName,
prepare_statement := #{send_message := SQL}
} = State
) ->
% it's already connected. Verify if target table still exists
Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
lists:foldl(
fun
(WorkerPid, ok) ->
case ecpool_worker:client(WorkerPid) of
{ok, Conn} ->
case mysql:prepare(Conn, get_status, SQL) of
{error, {1146, _, _}} ->
{error, {undefined_table, State}};
{ok, Statement} ->
mysql:unprepare(Conn, Statement);
_ ->
ok
end;
_ ->
ok
end;
(_, Acc) ->
Acc
end,
ok,
Workers
);
do_check_prepares(#{prepare_statement := Statement}) when is_map(Statement) ->
ok; ok;
do_check_prepares(State = #{pool_name := PoolName, prepare_statement := {error, Prepares}}) -> do_check_prepares(State = #{pool_name := PoolName, prepare_statement := {error, Prepares}}) ->
%% retry to prepare %% retry to prepare
@ -241,6 +273,9 @@ do_check_prepares(State = #{pool_name := PoolName, prepare_statement := {error,
ok -> ok ->
%% remove the error %% remove the error
{ok, State#{prepare_statement => Prepares}}; {ok, State#{prepare_statement => Prepares}};
{error, undefined_table} ->
%% indicate the error
{error, {undefined_table, State#{prepare_statement => {error, Prepares}}}};
{error, Reason} -> {error, Reason} ->
{error, Reason} {error, Reason}
end. end.
@ -320,6 +355,11 @@ prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList]) when is_pid(Conn) ->
{ok, _Key} -> {ok, _Key} ->
?SLOG(info, LogMeta#{result => success}), ?SLOG(info, LogMeta#{result => success}),
prepare_sql_to_conn(Conn, PrepareList); prepare_sql_to_conn(Conn, PrepareList);
{error, {1146, _, _} = Reason} ->
%% Target table is not created
?tp(mysql_undefined_table, #{}),
?SLOG(error, LogMeta#{result => failed, reason => Reason}),
{error, undefined_table};
{error, Reason} -> {error, Reason} ->
% FIXME: we should try to differ on transient failers and % FIXME: we should try to differ on transient failers and
% syntax failures. Retrying syntax failures is not very productive. % syntax failures. Retrying syntax failures is not very productive.

View File

@ -238,6 +238,8 @@ on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
case Reason of case Reason of
ecpool_empty -> ecpool_empty ->
{error, {recoverable_error, Reason}}; {error, {recoverable_error, Reason}};
{error, error, _, undefined_table, _, _} ->
{error, {unrecoverable_error, Reason}};
_ -> _ ->
Result Result
end; end;
@ -269,7 +271,10 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) ->
{ok, NState} -> {ok, NState} ->
%% return new state with prepared statements %% return new state with prepared statements
{connected, NState}; {connected, NState};
false -> {error, {undefined_table, NState}} ->
%% return new state indicating that we are connected but the target table is not created
{disconnected, NState, unhealthy_target};
{error, _Reason} ->
%% do not log error, it is logged in prepare_sql_to_conn %% do not log error, it is logged in prepare_sql_to_conn
connecting connecting
end; end;
@ -280,6 +285,34 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) ->
do_get_status(Conn) -> do_get_status(Conn) ->
ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")). ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")).
do_check_prepares(
#{
pool_name := PoolName,
prepare_sql := #{<<"send_message">> := SQL}
} = State
) ->
% it's already connected. Verify if target table still exists
Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
lists:foldl(
fun
(WorkerPid, ok) ->
case ecpool_worker:client(WorkerPid) of
{ok, Conn} ->
case epgsql:parse2(Conn, "get_status", SQL, []) of
{error, {_, _, _, undefined_table, _, _}} ->
{error, {undefined_table, State}};
_ ->
ok
end;
_ ->
ok
end;
(_, Acc) ->
Acc
end,
ok,
Workers
);
do_check_prepares(#{prepare_sql := Prepares}) when is_map(Prepares) -> do_check_prepares(#{prepare_sql := Prepares}) when is_map(Prepares) ->
ok; ok;
do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepares}}) -> do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepares}}) ->
@ -288,8 +321,11 @@ do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepar
{ok, Sts} -> {ok, Sts} ->
%% remove the error %% remove the error
{ok, State#{prepare_sql => Prepares, prepare_statement := Sts}}; {ok, State#{prepare_sql => Prepares, prepare_statement := Sts}};
_Error -> {error, undefined_table} ->
false %% indicate the error
{error, {undefined_table, State#{prepare_sql => {error, Prepares}}}};
Error ->
{error, Error}
end. end.
%% =================================================================== %% ===================================================================
@ -373,7 +409,7 @@ init_prepare(State = #{prepare_sql := Prepares, pool_name := PoolName}) ->
msg => <<"PostgreSQL init prepare statement failed">>, error => Error msg => <<"PostgreSQL init prepare statement failed">>, error => Error
}, },
?SLOG(error, LogMeta), ?SLOG(error, LogMeta),
%% mark the prepare_sqlas failed %% mark the prepare_sql as failed
State#{prepare_sql => {error, Prepares}} State#{prepare_sql => {error, Prepares}}
end end
end. end.
@ -414,6 +450,11 @@ prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Co
case epgsql:parse2(Conn, Key, SQL, []) of case epgsql:parse2(Conn, Key, SQL, []) of
{ok, Statement} -> {ok, Statement} ->
prepare_sql_to_conn(Conn, PrepareList, Statements#{Key => Statement}); prepare_sql_to_conn(Conn, PrepareList, Statements#{Key => Statement});
{error, {error, error, _, undefined_table, _, _} = Error} ->
%% Target table is not created
?tp(pgsql_undefined_table, #{}),
?SLOG(error, LogMeta#{msg => "PostgreSQL parse failed", error => Error}),
{error, undefined_table};
{error, Error} = Other -> {error, Error} = Other ->
?SLOG(error, LogMeta#{msg => "PostgreSQL parse failed", error => Error}), ?SLOG(error, LogMeta#{msg => "PostgreSQL parse failed", error => Error}),
Other Other
@ -424,6 +465,10 @@ to_bin(Bin) when is_binary(Bin) ->
to_bin(Atom) when is_atom(Atom) -> to_bin(Atom) when is_atom(Atom) ->
erlang:atom_to_binary(Atom). erlang:atom_to_binary(Atom).
handle_result({error, {recoverable_error, _Error}} = Res) ->
Res;
handle_result({error, {unrecoverable_error, _Error}} = Res) ->
Res;
handle_result({error, disconnected}) -> handle_result({error, disconnected}) ->
{error, {recoverable_error, disconnected}}; {error, {recoverable_error, disconnected}};
handle_result({error, Error}) -> handle_result({error, Error}) ->

View File

@ -9,8 +9,6 @@
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(ORACLE_DEFAULT_PORT, 1521).
%%==================================================================== %%====================================================================
%% Exports %% Exports
%%==================================================================== %%====================================================================
@ -26,7 +24,7 @@
]). ]).
%% callbacks for ecpool %% callbacks for ecpool
-export([connect/1, prepare_sql_to_conn/2]). -export([connect/1, prepare_sql_to_conn/3]).
%% Internal exports used to execute code with ecpool worker %% Internal exports used to execute code with ecpool worker
-export([ -export([
@ -39,18 +37,15 @@
oracle_host_options/0 oracle_host_options/0
]). ]).
-define(ACTION_SEND_MESSAGE, send_message). -define(ORACLE_DEFAULT_PORT, 1521).
-define(SYNC_QUERY_MODE, no_handover). -define(SYNC_QUERY_MODE, no_handover).
-define(DEFAULT_POOL_SIZE, 8).
-define(OPT_TIMEOUT, 30000).
-define(MAX_CURSORS, 10).
-define(ORACLE_HOST_OPTIONS, #{ -define(ORACLE_HOST_OPTIONS, #{
default_port => ?ORACLE_DEFAULT_PORT default_port => ?ORACLE_DEFAULT_PORT
}). }).
-define(MAX_CURSORS, 10).
-define(DEFAULT_POOL_SIZE, 8).
-define(OPT_TIMEOUT, 30000).
-type prepares() :: #{atom() => binary()}. -type prepares() :: #{atom() => binary()}.
-type params_tokens() :: #{atom() => list()}. -type params_tokens() :: #{atom() => list()}.
@ -105,7 +100,7 @@ on_start(
], ],
PoolName = InstId, PoolName = InstId,
Prepares = parse_prepare_sql(Config), Prepares = parse_prepare_sql(Config),
InitState = #{pool_name => PoolName, prepare_statement => #{}}, InitState = #{pool_name => PoolName},
State = maps:merge(InitState, Prepares), State = maps:merge(InitState, Prepares),
case emqx_resource_pool:start(InstId, ?MODULE, Options) of case emqx_resource_pool:start(InstId, ?MODULE, Options) of
ok -> ok ->
@ -148,7 +143,7 @@ on_query(
on_batch_query( on_batch_query(
InstId, InstId,
BatchReq, BatchReq,
#{pool_name := PoolName, params_tokens := Tokens, prepare_statement := Sts} = State #{pool_name := PoolName, params_tokens := Tokens, prepare_sql := Sts} = State
) -> ) ->
case BatchReq of case BatchReq of
[{Key, _} = Request | _] -> [{Key, _} = Request | _] ->
@ -241,7 +236,13 @@ on_get_status(_InstId, #{pool_name := Pool} = State) ->
connected; connected;
{ok, NState} -> {ok, NState} ->
%% return new state with prepared statements %% return new state with prepared statements
{connected, NState} {connected, NState};
{error, {undefined_table, NState}} ->
%% return new state indicating that we are connected but the target table is not created
{disconnected, NState, unhealthy_target};
{error, _Reason} ->
%% do not log error, it is logged in prepare_sql_to_conn
connecting
end; end;
false -> false ->
disconnected disconnected
@ -250,11 +251,46 @@ on_get_status(_InstId, #{pool_name := Pool} = State) ->
do_get_status(Conn) -> do_get_status(Conn) ->
ok == element(1, jamdb_oracle:sql_query(Conn, "select 1 from dual")). ok == element(1, jamdb_oracle:sql_query(Conn, "select 1 from dual")).
do_check_prepares(#{prepare_sql := Prepares}) when is_map(Prepares) -> do_check_prepares(
ok; #{
do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepares}}) -> pool_name := PoolName,
{ok, Sts} = prepare_sql(Prepares, PoolName), prepare_sql := #{<<"send_message">> := SQL},
{ok, State#{prepare_sql => Prepares, prepare_statement := Sts}}. params_tokens := #{<<"send_message">> := Tokens}
} = State
) ->
% it's already connected. Verify if target table still exists
Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
lists:foldl(
fun
(WorkerPid, ok) ->
case ecpool_worker:client(WorkerPid) of
{ok, Conn} ->
case check_if_table_exists(Conn, SQL, Tokens) of
{error, undefined_table} -> {error, {undefined_table, State}};
_ -> ok
end;
_ ->
ok
end;
(_, Acc) ->
Acc
end,
ok,
Workers
);
do_check_prepares(
State = #{pool_name := PoolName, prepare_sql := {error, Prepares}, params_tokens := TokensMap}
) ->
case prepare_sql(Prepares, PoolName, TokensMap) of
%% remove the error
{ok, Sts} ->
{ok, State#{prepare_sql => Sts}};
{error, undefined_table} ->
%% indicate the error
{error, {undefined_table, State#{prepare_sql => {error, Prepares}}}};
{error, _Reason} = Error ->
Error
end.
%% =================================================================== %% ===================================================================
@ -312,36 +348,81 @@ parse_prepare_sql([], Prepares, Tokens) ->
params_tokens => Tokens params_tokens => Tokens
}. }.
init_prepare(State = #{prepare_sql := Prepares, pool_name := PoolName}) -> init_prepare(State = #{prepare_sql := Prepares, pool_name := PoolName, params_tokens := TokensMap}) ->
{ok, Sts} = prepare_sql(Prepares, PoolName), case prepare_sql(Prepares, PoolName, TokensMap) of
State#{prepare_statement := Sts}. {ok, Sts} ->
State#{prepare_sql := Sts};
Error ->
LogMeta = #{
msg => <<"Oracle Database init prepare statement failed">>, error => Error
},
?SLOG(error, LogMeta),
%% mark the prepare_sql as failed
State#{prepare_sql => {error, Prepares}}
end.
prepare_sql(Prepares, PoolName) when is_map(Prepares) -> prepare_sql(Prepares, PoolName, TokensMap) when is_map(Prepares) ->
prepare_sql(maps:to_list(Prepares), PoolName); prepare_sql(maps:to_list(Prepares), PoolName, TokensMap);
prepare_sql(Prepares, PoolName) -> prepare_sql(Prepares, PoolName, TokensMap) ->
Data = do_prepare_sql(Prepares, PoolName), case do_prepare_sql(Prepares, PoolName, TokensMap) of
{ok, _Sts} = Data, {ok, _Sts} = Ok ->
%% prepare for reconnect
ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}), ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}),
Data. Ok;
Error ->
Error
end.
do_prepare_sql(Prepares, PoolName) -> do_prepare_sql(Prepares, PoolName, TokensMap) ->
do_prepare_sql(ecpool:workers(PoolName), Prepares, PoolName, #{}). do_prepare_sql(ecpool:workers(PoolName), Prepares, PoolName, TokensMap, #{}).
do_prepare_sql([{_Name, Worker} | T], Prepares, PoolName, _LastSts) -> do_prepare_sql([{_Name, Worker} | T], Prepares, PoolName, TokensMap, _LastSts) ->
{ok, Conn} = ecpool_worker:client(Worker), {ok, Conn} = ecpool_worker:client(Worker),
{ok, Sts} = prepare_sql_to_conn(Conn, Prepares), case prepare_sql_to_conn(Conn, Prepares, TokensMap) of
do_prepare_sql(T, Prepares, PoolName, Sts); {ok, Sts} ->
do_prepare_sql([], _Prepares, _PoolName, LastSts) -> do_prepare_sql(T, Prepares, PoolName, TokensMap, Sts);
Error ->
Error
end;
do_prepare_sql([], _Prepares, _PoolName, _TokensMap, LastSts) ->
{ok, LastSts}. {ok, LastSts}.
prepare_sql_to_conn(Conn, Prepares) -> prepare_sql_to_conn(Conn, Prepares, TokensMap) ->
prepare_sql_to_conn(Conn, Prepares, #{}). prepare_sql_to_conn(Conn, Prepares, TokensMap, #{}).
prepare_sql_to_conn(Conn, [], Statements) when is_pid(Conn) -> {ok, Statements}; prepare_sql_to_conn(Conn, [], _TokensMap, Statements) when is_pid(Conn) -> {ok, Statements};
prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Conn) -> prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], TokensMap, Statements) when is_pid(Conn) ->
LogMeta = #{msg => "Oracle Database Prepare Statement", name => Key, prepare_sql => SQL}, LogMeta = #{msg => "Oracle Database Prepare Statement", name => Key, prepare_sql => SQL},
Tokens = maps:get(Key, TokensMap, []),
?SLOG(info, LogMeta), ?SLOG(info, LogMeta),
prepare_sql_to_conn(Conn, PrepareList, Statements#{Key => SQL}). case check_if_table_exists(Conn, SQL, Tokens) of
ok ->
?SLOG(info, LogMeta#{result => success}),
prepare_sql_to_conn(Conn, PrepareList, TokensMap, Statements#{Key => SQL});
{error, undefined_table} = Error ->
%% Target table is not created
?SLOG(error, LogMeta#{result => failed, reason => "table does not exist"}),
?tp(oracle_undefined_table, #{}),
Error;
Error ->
Error
end.
check_if_table_exists(Conn, SQL, Tokens) ->
{Event, _Headers} = emqx_rule_events:eventmsg_publish(
emqx_message:make(<<"t/opic">>, "test query")
),
SqlQuery = "begin " ++ binary_to_list(SQL) ++ "; rollback; end;",
Params = emqx_placeholder:proc_sql(Tokens, Event),
case jamdb_oracle:sql_query(Conn, {SqlQuery, Params}) of
{ok, [{proc_result, 0, _Description}]} ->
ok;
{ok, [{proc_result, 6550, _Description}]} ->
%% Target table is not created
{error, undefined_table};
Reason ->
{error, Reason}
end.
to_bin(Bin) when is_binary(Bin) -> to_bin(Bin) when is_binary(Bin) ->
Bin; Bin;

View File

@ -278,20 +278,22 @@ query(ResId, Request) ->
Result :: term(). Result :: term().
query(ResId, Request, Opts) -> query(ResId, Request, Opts) ->
case emqx_resource_manager:lookup_cached(ResId) of case emqx_resource_manager:lookup_cached(ResId) of
{ok, _Group, #{query_mode := QM}} -> {ok, _Group, #{query_mode := QM, error := Error}} ->
case QM of case {QM, Error} of
simple_async -> {_, unhealthy_target} ->
?RESOURCE_ERROR(unhealthy_target, "unhealthy target");
{simple_async, _} ->
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs %% TODO(5.1.1): pass Resource instead of ResId to simple APIs
%% so the buffer worker does not need to lookup the cache again %% so the buffer worker does not need to lookup the cache again
Opts1 = Opts#{is_buffer_supported => true}, Opts1 = Opts#{is_buffer_supported => true},
emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1); emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1);
simple_sync -> {simple_sync, _} ->
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs %% TODO(5.1.1): pass Resource instead of ResId to simple APIs
%% so the buffer worker does not need to lookup the cache again %% so the buffer worker does not need to lookup the cache again
emqx_resource_buffer_worker:simple_sync_query(ResId, Request); emqx_resource_buffer_worker:simple_sync_query(ResId, Request);
sync -> {sync, _} ->
emqx_resource_buffer_worker:sync_query(ResId, Request, Opts); emqx_resource_buffer_worker:sync_query(ResId, Request, Opts);
async -> {async, _} ->
emqx_resource_buffer_worker:async_query(ResId, Request, Opts) emqx_resource_buffer_worker:async_query(ResId, Request, Opts)
end; end;
{error, not_found} -> {error, not_found} ->

View File

@ -1002,6 +1002,8 @@ call_query(QM, Id, Index, Ref, Query, QueryOpts) ->
case emqx_resource_manager:lookup_cached(Id) of case emqx_resource_manager:lookup_cached(Id) of
{ok, _Group, #{status := stopped}} -> {ok, _Group, #{status := stopped}} ->
?RESOURCE_ERROR(stopped, "resource stopped or disabled"); ?RESOURCE_ERROR(stopped, "resource stopped or disabled");
{ok, _Group, #{status := connecting, error := unhealthy_target}} ->
{error, {unrecoverable_error, unhealthy_target}};
{ok, _Group, Resource} -> {ok, _Group, Resource} ->
do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource); do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource);
{error, not_found} -> {error, not_found} ->

View File

@ -0,0 +1 @@
Changes health check for Oracle Database, PostgreSql, MySql and Kafka Producer data bridges to ensure target table/topic exists.

View File

@ -110,6 +110,7 @@ end_per_suite(_Config) ->
ok. ok.
init_per_testcase(_Testcase, Config) -> init_per_testcase(_Testcase, Config) ->
connect_and_create_table(Config),
connect_and_clear_table(Config), connect_and_clear_table(Config),
delete_bridge(Config), delete_bridge(Config),
snabbkaffe:start_trace(), snabbkaffe:start_trace(),
@ -241,6 +242,12 @@ query_resource(Config, Request) ->
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
emqx_resource:query(ResourceID, Request, #{timeout => 500}). emqx_resource:query(ResourceID, Request, #{timeout => 500}).
sync_query_resource(Config, Request) ->
Name = ?config(mysql_name, Config),
BridgeType = ?config(mysql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
emqx_resource_buffer_worker:simple_sync_query(ResourceID, Request).
query_resource_async(Config, Request) -> query_resource_async(Config, Request) ->
Name = ?config(mysql_name, Config), Name = ?config(mysql_name, Config),
BridgeType = ?config(mysql_bridge_type, Config), BridgeType = ?config(mysql_bridge_type, Config),
@ -480,6 +487,7 @@ t_write_timeout(Config) ->
ProxyHost = ?config(proxy_host, Config), ProxyHost = ?config(proxy_host, Config),
QueryMode = ?config(query_mode, Config), QueryMode = ?config(query_mode, Config),
{ok, _} = create_bridge(Config), {ok, _} = create_bridge(Config),
connect_and_create_table(Config),
Val = integer_to_binary(erlang:unique_integer()), Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000}, SentData = #{payload => Val, timestamp => 1668602148000},
Timeout = 1000, Timeout = 1000,
@ -641,6 +649,7 @@ t_workload_fits_prepared_statement_limit(Config) ->
). ).
t_unprepared_statement_query(Config) -> t_unprepared_statement_query(Config) ->
ok = connect_and_create_table(Config),
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
create_bridge(Config) create_bridge(Config)
@ -668,6 +677,7 @@ t_unprepared_statement_query(Config) ->
%% Test doesn't work with batch enabled since batch doesn't use %% Test doesn't work with batch enabled since batch doesn't use
%% prepared statements as such; it has its own query generation process %% prepared statements as such; it has its own query generation process
t_uninitialized_prepared_statement(Config) -> t_uninitialized_prepared_statement(Config) ->
connect_and_create_table(Config),
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
create_bridge(Config) create_bridge(Config)
@ -705,3 +715,64 @@ t_uninitialized_prepared_statement(Config) ->
end end
), ),
ok. ok.
t_missing_table(Config) ->
Name = ?config(mysql_name, Config),
BridgeType = ?config(mysql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
?check_trace(
begin
connect_and_drop_table(Config),
?assertMatch({ok, _}, create_bridge(Config)),
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertMatch(
{ok, Status} when Status == connecting orelse Status == disconnected,
emqx_resource_manager:health_check(ResourceID)
)
),
Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000},
Timeout = 1000,
?assertMatch(
{error, {resource_error, #{reason := unhealthy_target}}},
query_resource(Config, {send_message, SentData, [], Timeout})
),
ok
end,
fun(Trace) ->
?assertMatch([_, _, _], ?of_kind(mysql_undefined_table, Trace)),
ok
end
).
t_table_removed(Config) ->
Name = ?config(mysql_name, Config),
BridgeType = ?config(mysql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
?check_trace(
begin
connect_and_create_table(Config),
?assertMatch({ok, _}, create_bridge(Config)),
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
),
connect_and_drop_table(Config),
Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000},
Timeout = 1000,
?assertMatch(
{error,
{unrecoverable_error,
{1146, <<"42S02">>, <<"Table 'mqtt.mqtt_test' doesn't exist">>}}},
sync_query_resource(Config, {send_message, SentData, [], Timeout})
),
ok
end,
[]
),
ok.

View File

@ -196,7 +196,7 @@ defmodule EMQXUmbrella.MixProject do
[ [
{:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"}, {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"},
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.11", override: true}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.11", override: true},
{:wolff, github: "kafka4beam/wolff", tag: "1.7.5"}, {:wolff, github: "kafka4beam/wolff", tag: "1.7.6"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0"}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0"},
{:brod, github: "kafka4beam/brod", tag: "3.16.8"}, {:brod, github: "kafka4beam/brod", tag: "3.16.8"},