feat(kafka): check whether target topic exists

Fixes https://emqx.atlassian.net/browse/EMQX-9026
This commit is contained in:
Paulo Zulato 2023-05-17 21:43:28 -03:00
parent 5f10936091
commit 8430ec673c
8 changed files with 127 additions and 50 deletions

View File

@ -1,6 +1,6 @@
%% -*- mode: erlang; -*-
{erl_opts, [debug_info]}.
{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.5"}}}
{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.6"}}}
, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}}
, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0"}}}
, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}

View File

@ -73,6 +73,12 @@ on_start(InstId, Config) ->
sasl => emqx_bridge_kafka_impl:sasl(Auth),
ssl => ssl(SSL)
},
case do_get_topic_status(Hosts, KafkaConfig, KafkaTopic) of
unhealthy_target ->
throw(unhealthy_target);
_ ->
ok
end,
case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of
{ok, _} ->
?SLOG(info, #{
@ -108,7 +114,9 @@ on_start(InstId, Config) ->
kafka_topic => KafkaTopic,
producers => Producers,
resource_id => ResourceId,
sync_query_timeout => SyncQueryTimeout
sync_query_timeout => SyncQueryTimeout,
hosts => Hosts,
kafka_config => KafkaConfig
}};
{error, Reason2} ->
?SLOG(error, #{
@ -131,6 +139,7 @@ on_start(InstId, Config) ->
client_id => ClientId
}
),
throw(
"Failed to start Kafka client. Please check the logs for errors and check"
" the connection parameters."
@ -294,34 +303,60 @@ on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) ->
%% Note: since wolff client has its own replayq that is not managed by
%% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here. Otherwise,
%% `emqx_resource_manager' will kill the wolff producers and messages might be lost.
on_get_status(_InstId, #{client_id := ClientId, kafka_topic := KafkaTopic}) ->
on_get_status(_InstId, #{client_id := ClientId} = State) ->
case wolff_client_sup:find_client(ClientId) of
{ok, Pid} ->
do_get_status(Pid, KafkaTopic);
case do_get_status(Pid, State) of
ok -> connected;
unhealthy_target -> {disconnected, State, unhealthy_target};
error -> connecting
end;
{error, _Reason} ->
connecting
end.
do_get_status(Client, KafkaTopic) ->
%% TODO: add a wolff_producers:check_connectivity
do_get_status(Client, #{kafka_topic := KafkaTopic, hosts := Hosts, kafka_config := KafkaConfig}) ->
case do_get_topic_status(Hosts, KafkaConfig, KafkaTopic) of
unhealthy_target ->
unhealthy_target;
_ ->
case do_get_healthy_leaders(Client, KafkaTopic) of
[] -> error;
_ -> ok
end
end.
do_get_healthy_leaders(Client, KafkaTopic) ->
case wolff_client:get_leader_connections(Client, KafkaTopic) of
{ok, Leaders} ->
%% Kafka is considered healthy as long as any of the partition leader is reachable
case
lists:any(
%% Kafka is considered healthy as long as any of the partition leader is reachable.
lists:filtermap(
fun({_Partition, Pid}) ->
is_pid(Pid) andalso erlang:is_process_alive(Pid)
case is_pid(Pid) andalso erlang:is_process_alive(Pid) of
true -> {true, Pid};
_ -> false
end
end,
Leaders
)
of
true ->
connected;
false ->
connecting
end;
);
{error, _} ->
connecting
[]
end.
do_get_topic_status(Hosts, KafkaConfig, KafkaTopic) ->
CheckTopicFun =
fun() ->
wolff_client:check_if_topic_exists(Hosts, KafkaConfig, KafkaTopic)
end,
try
case emqx_utils:nolink_apply(CheckTopicFun, 5_000) of
ok -> ok;
{error, unknown_topic_or_partition} -> unhealthy_target;
_ -> error
end
catch
_:_ ->
error
end.
ssl(#{enable := true} = SSL) ->

View File

@ -472,6 +472,38 @@ t_failed_creation_then_fix(Config) ->
delete_all_bridges(),
ok.
t_table_removed(_Config) ->
HostsString = kafka_hosts_string_sasl(),
AuthSettings = valid_sasl_plain_settings(),
Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]),
Type = ?BRIDGE_TYPE,
Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
ResourceId = emqx_bridge_resource:resource_id(Type, Name),
BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
KafkaTopic = "undefined-test-topic",
Conf = config(#{
"authentication" => AuthSettings,
"kafka_hosts_string" => HostsString,
"kafka_topic" => KafkaTopic,
"instance_id" => ResourceId,
"producer" => #{
"kafka" => #{
"buffer" => #{
"memory_overload_protection" => false
}
}
},
"ssl" => #{}
}),
{ok, #{config := ValidConfigAtom1}} = emqx_bridge:create(
Type, erlang:list_to_atom(Name), Conf
),
ValidConfigAtom = ValidConfigAtom1#{bridge_name => Name},
?assertThrow(_, ?PRODUCER:on_start(ResourceId, ValidConfigAtom)),
ok = emqx_bridge_resource:remove(BridgeId),
delete_all_bridges(),
ok.
%%------------------------------------------------------------------------------
%% Helper functions
%%------------------------------------------------------------------------------

View File

@ -222,12 +222,11 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) ->
{ok, NState} ->
%% return new state with prepared statements
{connected, NState};
{error, {undefined_table, NState}} ->
{disconnected, NState, unhealthy_target};
{error, _Reason} ->
%% do not log error, it is logged in prepare_sql_to_conn
connecting;
{undefined_table, NState} ->
%% return new state indicating that we are connected but the target table is not created
{disconnected, NState, unhealthy_target}
connecting
end;
false ->
connecting
@ -251,7 +250,7 @@ do_check_prepares(
{ok, Conn} ->
case mysql:prepare(Conn, get_status, SQL) of
{error, {1146, _, _}} ->
{undefined_table, State};
{error, {undefined_table, State}};
{ok, Statement} ->
mysql:unprepare(Conn, Statement);
_ ->
@ -276,7 +275,7 @@ do_check_prepares(State = #{pool_name := PoolName, prepare_statement := {error,
{ok, State#{prepare_statement => Prepares}};
{error, undefined_table} ->
%% indicate the error
{undefined_table, State#{prepare_statement => {error, Prepares}}};
{error, {undefined_table, State#{prepare_statement => {error, Prepares}}}};
{error, Reason} ->
{error, Reason}
end.

View File

@ -271,12 +271,12 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) ->
{ok, NState} ->
%% return new state with prepared statements
{connected, NState};
false ->
%% do not log error, it is logged in prepare_sql_to_conn
connecting;
{undefined_table, NState} ->
{error, {undefined_table, NState}} ->
%% return new state indicating that we are connected but the target table is not created
{disconnected, NState, unhealthy_target}
{disconnected, NState, unhealthy_target};
{error, _Reason} ->
%% do not log error, it is logged in prepare_sql_to_conn
connecting
end;
false ->
connecting
@ -296,10 +296,14 @@ do_check_prepares(
lists:foldl(
fun
(WorkerPid, ok) ->
{ok, Conn} = ecpool_worker:client(WorkerPid),
case ecpool_worker:client(WorkerPid) of
{ok, Conn} ->
case epgsql:parse2(Conn, "get_status", SQL, []) of
{error, {_, _, _, undefined_table, _, _}} ->
{undefined_table, State};
{error, {undefined_table, State}};
_ ->
ok
end;
_ ->
ok
end;
@ -319,9 +323,9 @@ do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepar
{ok, State#{prepare_sql => Prepares, prepare_statement := Sts}};
{error, undefined_table} ->
%% indicate the error
{undefined_table, State#{prepare_sql => {error, Prepares}}};
_Error ->
false
{error, {undefined_table, State#{prepare_sql => {error, Prepares}}}};
Error ->
{error, Error}
end.
%% ===================================================================

View File

@ -237,12 +237,12 @@ on_get_status(_InstId, #{pool_name := Pool} = State) ->
{ok, NState} ->
%% return new state with prepared statements
{connected, NState};
{error, {undefined_table, NState}} ->
%% return new state indicating that we are connected but the target table is not created
{disconnected, NState, unhealthy_target};
{error, _Reason} ->
%% do not log error, it is logged in prepare_sql_to_conn
connecting;
{undefined_table, NState} ->
%% return new state indicating that we are connected but the target table is not created
{disconnected, NState, unhealthy_target}
connecting
end;
false ->
disconnected
@ -263,11 +263,15 @@ do_check_prepares(
lists:foldl(
fun
(WorkerPid, ok) ->
{ok, Conn} = ecpool_worker:client(WorkerPid),
case ecpool_worker:client(WorkerPid) of
{ok, Conn} ->
case check_if_table_exists(Conn, SQL, Tokens) of
{error, undefined_table} -> {undefined_table, State};
{error, undefined_table} -> {error, {undefined_table, State}};
_ -> ok
end;
_ ->
ok
end;
(_, Acc) ->
Acc
end,
@ -283,7 +287,7 @@ do_check_prepares(
{ok, State#{prepare_sql => Sts}};
{error, undefined_table} ->
%% indicate the error
{undefined_table, State#{prepare_sql => {error, Prepares}}};
{error, {undefined_table, State#{prepare_sql => {error, Prepares}}}};
{error, _Reason} = Error ->
Error
end.

View File

@ -0,0 +1 @@
Changes health check for Oracle Database, PostgreSql, MySql and Kafka Producer data bridges to ensure target table/topic exists.

View File

@ -95,7 +95,9 @@ defmodule EMQXUmbrella.MixProject do
github: "emqx/ranch", ref: "de8ba2a00817c0a6eb1b8f20d6fb3e44e2c9a5aa", override: true},
# in conflict by grpc and eetcd
{:gpb, "4.19.7", override: true, runtime: false},
{:hackney, github: "emqx/hackney", tag: "1.18.1-1", override: true}
{:hackney, github: "emqx/hackney", tag: "1.18.1-1", override: true},
# set by hackney (dependency)
{:ssl_verify_fun, "1.1.6", override: true}
] ++
emqx_apps(profile_info, version) ++
enterprise_deps(profile_info) ++ bcrypt_dep() ++ jq_dep() ++ quicer_dep()
@ -194,7 +196,7 @@ defmodule EMQXUmbrella.MixProject do
[
{:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"},
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.11", override: true},
{:wolff, github: "kafka4beam/wolff", tag: "1.7.5"},
{:wolff, github: "kafka4beam/wolff", tag: "1.7.6"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0"},
{:brod, github: "kafka4beam/brod", tag: "3.16.8"},