diff --git a/apps/emqx_bridge_kafka/rebar.config b/apps/emqx_bridge_kafka/rebar.config index cfe84a2a8..945ccbdba 100644 --- a/apps/emqx_bridge_kafka/rebar.config +++ b/apps/emqx_bridge_kafka/rebar.config @@ -1,6 +1,6 @@ %% -*- mode: erlang; -*- {erl_opts, [debug_info]}. -{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.5"}}} +{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.6"}}} , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}} , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0"}}} , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}} diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index e91cce600..be4c2d860 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -73,6 +73,12 @@ on_start(InstId, Config) -> sasl => emqx_bridge_kafka_impl:sasl(Auth), ssl => ssl(SSL) }, + case do_get_topic_status(Hosts, KafkaConfig, KafkaTopic) of + unhealthy_target -> + throw(unhealthy_target); + _ -> + ok + end, case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of {ok, _} -> ?SLOG(info, #{ @@ -108,7 +114,9 @@ on_start(InstId, Config) -> kafka_topic => KafkaTopic, producers => Producers, resource_id => ResourceId, - sync_query_timeout => SyncQueryTimeout + sync_query_timeout => SyncQueryTimeout, + hosts => Hosts, + kafka_config => KafkaConfig }}; {error, Reason2} -> ?SLOG(error, #{ @@ -131,6 +139,7 @@ on_start(InstId, Config) -> client_id => ClientId } ), + throw( "Failed to start Kafka client. Please check the logs for errors and check" " the connection parameters." @@ -294,34 +303,60 @@ on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) -> %% Note: since wolff client has its own replayq that is not managed by %% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here. Otherwise, %% `emqx_resource_manager' will kill the wolff producers and messages might be lost. -on_get_status(_InstId, #{client_id := ClientId, kafka_topic := KafkaTopic}) -> +on_get_status(_InstId, #{client_id := ClientId} = State) -> case wolff_client_sup:find_client(ClientId) of {ok, Pid} -> - do_get_status(Pid, KafkaTopic); + case do_get_status(Pid, State) of + ok -> connected; + unhealthy_target -> {disconnected, State, unhealthy_target}; + error -> connecting + end; {error, _Reason} -> connecting end. -do_get_status(Client, KafkaTopic) -> - %% TODO: add a wolff_producers:check_connectivity +do_get_status(Client, #{kafka_topic := KafkaTopic, hosts := Hosts, kafka_config := KafkaConfig}) -> + case do_get_topic_status(Hosts, KafkaConfig, KafkaTopic) of + unhealthy_target -> + unhealthy_target; + _ -> + case do_get_healthy_leaders(Client, KafkaTopic) of + [] -> error; + _ -> ok + end + end. + +do_get_healthy_leaders(Client, KafkaTopic) -> case wolff_client:get_leader_connections(Client, KafkaTopic) of {ok, Leaders} -> - %% Kafka is considered healthy as long as any of the partition leader is reachable - case - lists:any( - fun({_Partition, Pid}) -> - is_pid(Pid) andalso erlang:is_process_alive(Pid) - end, - Leaders - ) - of - true -> - connected; - false -> - connecting - end; + %% Kafka is considered healthy as long as any of the partition leader is reachable. + lists:filtermap( + fun({_Partition, Pid}) -> + case is_pid(Pid) andalso erlang:is_process_alive(Pid) of + true -> {true, Pid}; + _ -> false + end + end, + Leaders + ); {error, _} -> - connecting + [] + end. + +do_get_topic_status(Hosts, KafkaConfig, KafkaTopic) -> + CheckTopicFun = + fun() -> + wolff_client:check_if_topic_exists(Hosts, KafkaConfig, KafkaTopic) + end, + try + case emqx_utils:nolink_apply(CheckTopicFun, 5_000) of + ok -> ok; + {error, unknown_topic_or_partition} -> unhealthy_target; + _ -> error + end + catch + _:_ -> + error end. ssl(#{enable := true} = SSL) -> diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index 6b7b961f6..6031c21cb 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -472,6 +472,38 @@ t_failed_creation_then_fix(Config) -> delete_all_bridges(), ok. +t_table_removed(_Config) -> + HostsString = kafka_hosts_string_sasl(), + AuthSettings = valid_sasl_plain_settings(), + Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]), + Type = ?BRIDGE_TYPE, + Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), + ResourceId = emqx_bridge_resource:resource_id(Type, Name), + BridgeId = emqx_bridge_resource:bridge_id(Type, Name), + KafkaTopic = "undefined-test-topic", + Conf = config(#{ + "authentication" => AuthSettings, + "kafka_hosts_string" => HostsString, + "kafka_topic" => KafkaTopic, + "instance_id" => ResourceId, + "producer" => #{ + "kafka" => #{ + "buffer" => #{ + "memory_overload_protection" => false + } + } + }, + "ssl" => #{} + }), + {ok, #{config := ValidConfigAtom1}} = emqx_bridge:create( + Type, erlang:list_to_atom(Name), Conf + ), + ValidConfigAtom = ValidConfigAtom1#{bridge_name => Name}, + ?assertThrow(_, ?PRODUCER:on_start(ResourceId, ValidConfigAtom)), + ok = emqx_bridge_resource:remove(BridgeId), + delete_all_bridges(), + ok. + %%------------------------------------------------------------------------------ %% Helper functions %%------------------------------------------------------------------------------ diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 4a3ad1f3b..2931a8ec5 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -222,12 +222,11 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) -> {ok, NState} -> %% return new state with prepared statements {connected, NState}; + {error, {undefined_table, NState}} -> + {disconnected, NState, unhealthy_target}; {error, _Reason} -> %% do not log error, it is logged in prepare_sql_to_conn - connecting; - {undefined_table, NState} -> - %% return new state indicating that we are connected but the target table is not created - {disconnected, NState, unhealthy_target} + connecting end; false -> connecting @@ -251,7 +250,7 @@ do_check_prepares( {ok, Conn} -> case mysql:prepare(Conn, get_status, SQL) of {error, {1146, _, _}} -> - {undefined_table, State}; + {error, {undefined_table, State}}; {ok, Statement} -> mysql:unprepare(Conn, Statement); _ -> @@ -276,7 +275,7 @@ do_check_prepares(State = #{pool_name := PoolName, prepare_statement := {error, {ok, State#{prepare_statement => Prepares}}; {error, undefined_table} -> %% indicate the error - {undefined_table, State#{prepare_statement => {error, Prepares}}}; + {error, {undefined_table, State#{prepare_statement => {error, Prepares}}}}; {error, Reason} -> {error, Reason} end. diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index e0d3e834f..71d18f4a8 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -271,12 +271,12 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) -> {ok, NState} -> %% return new state with prepared statements {connected, NState}; - false -> - %% do not log error, it is logged in prepare_sql_to_conn - connecting; - {undefined_table, NState} -> + {error, {undefined_table, NState}} -> %% return new state indicating that we are connected but the target table is not created - {disconnected, NState, unhealthy_target} + {disconnected, NState, unhealthy_target}; + {error, _Reason} -> + %% do not log error, it is logged in prepare_sql_to_conn + connecting end; false -> connecting @@ -296,10 +296,14 @@ do_check_prepares( lists:foldl( fun (WorkerPid, ok) -> - {ok, Conn} = ecpool_worker:client(WorkerPid), - case epgsql:parse2(Conn, "get_status", SQL, []) of - {error, {_, _, _, undefined_table, _, _}} -> - {undefined_table, State}; + case ecpool_worker:client(WorkerPid) of + {ok, Conn} -> + case epgsql:parse2(Conn, "get_status", SQL, []) of + {error, {_, _, _, undefined_table, _, _}} -> + {error, {undefined_table, State}}; + _ -> + ok + end; _ -> ok end; @@ -319,9 +323,9 @@ do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepar {ok, State#{prepare_sql => Prepares, prepare_statement := Sts}}; {error, undefined_table} -> %% indicate the error - {undefined_table, State#{prepare_sql => {error, Prepares}}}; - _Error -> - false + {error, {undefined_table, State#{prepare_sql => {error, Prepares}}}}; + Error -> + {error, Error} end. %% =================================================================== diff --git a/apps/emqx_oracle/src/emqx_oracle.erl b/apps/emqx_oracle/src/emqx_oracle.erl index 2d190057c..5a7f8d752 100644 --- a/apps/emqx_oracle/src/emqx_oracle.erl +++ b/apps/emqx_oracle/src/emqx_oracle.erl @@ -237,12 +237,12 @@ on_get_status(_InstId, #{pool_name := Pool} = State) -> {ok, NState} -> %% return new state with prepared statements {connected, NState}; + {error, {undefined_table, NState}} -> + %% return new state indicating that we are connected but the target table is not created + {disconnected, NState, unhealthy_target}; {error, _Reason} -> %% do not log error, it is logged in prepare_sql_to_conn - connecting; - {undefined_table, NState} -> - %% return new state indicating that we are connected but the target table is not created - {disconnected, NState, unhealthy_target} + connecting end; false -> disconnected @@ -263,10 +263,14 @@ do_check_prepares( lists:foldl( fun (WorkerPid, ok) -> - {ok, Conn} = ecpool_worker:client(WorkerPid), - case check_if_table_exists(Conn, SQL, Tokens) of - {error, undefined_table} -> {undefined_table, State}; - _ -> ok + case ecpool_worker:client(WorkerPid) of + {ok, Conn} -> + case check_if_table_exists(Conn, SQL, Tokens) of + {error, undefined_table} -> {error, {undefined_table, State}}; + _ -> ok + end; + _ -> + ok end; (_, Acc) -> Acc @@ -283,7 +287,7 @@ do_check_prepares( {ok, State#{prepare_sql => Sts}}; {error, undefined_table} -> %% indicate the error - {undefined_table, State#{prepare_sql => {error, Prepares}}}; + {error, {undefined_table, State#{prepare_sql => {error, Prepares}}}}; {error, _Reason} = Error -> Error end. diff --git a/changes/ee/fix-10645.en.md b/changes/ee/fix-10645.en.md new file mode 100644 index 000000000..e97bb1c74 --- /dev/null +++ b/changes/ee/fix-10645.en.md @@ -0,0 +1 @@ +Changes health check for Oracle Database, PostgreSql, MySql and Kafka Producer data bridges to ensure target table/topic exists. diff --git a/mix.exs b/mix.exs index 89354ea92..d76cd8f60 100644 --- a/mix.exs +++ b/mix.exs @@ -95,7 +95,9 @@ defmodule EMQXUmbrella.MixProject do github: "emqx/ranch", ref: "de8ba2a00817c0a6eb1b8f20d6fb3e44e2c9a5aa", override: true}, # in conflict by grpc and eetcd {:gpb, "4.19.7", override: true, runtime: false}, - {:hackney, github: "emqx/hackney", tag: "1.18.1-1", override: true} + {:hackney, github: "emqx/hackney", tag: "1.18.1-1", override: true}, + # set by hackney (dependency) + {:ssl_verify_fun, "1.1.6", override: true} ] ++ emqx_apps(profile_info, version) ++ enterprise_deps(profile_info) ++ bcrypt_dep() ++ jq_dep() ++ quicer_dep() @@ -194,7 +196,7 @@ defmodule EMQXUmbrella.MixProject do [ {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.11", override: true}, - {:wolff, github: "kafka4beam/wolff", tag: "1.7.5"}, + {:wolff, github: "kafka4beam/wolff", tag: "1.7.6"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0"}, {:brod, github: "kafka4beam/brod", tag: "3.16.8"},