diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 890a07a30..10e0ef1a5 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -102,6 +102,7 @@ on_start(InstId, Config) -> client_id => ClientId, resource_id => ResourceId, hosts => Hosts, + client_config => ClientConfig, installed_bridge_v2s => #{} }}. @@ -110,6 +111,7 @@ on_add_channel( #{ client_id := ClientId, hosts := Hosts, + client_config := ClientConfig, installed_bridge_v2s := InstalledBridgeV2s } = OldState, BridgeV2Id, @@ -117,7 +119,7 @@ on_add_channel( ) -> %% The following will throw an exception if the bridge producers fails to start {ok, BridgeV2State} = create_producers_for_bridge_v2( - InstId, BridgeV2Id, ClientId, Hosts, BridgeV2Config + InstId, BridgeV2Id, ClientId, Hosts, ClientConfig, BridgeV2Config ), NewInstalledBridgeV2s = maps:put(BridgeV2Id, BridgeV2State, InstalledBridgeV2s), %% Update state @@ -129,15 +131,17 @@ create_producers_for_bridge_v2( BridgeV2Id, ClientId, Hosts, + ClientConfig, #{ bridge_type := BridgeType, - kafka := #{ - message := MessageTemplate, - topic := KafkaTopic, - sync_query_timeout := SyncQueryTimeout - } = KafkaConfig + kafka := KafkaConfig } ) -> + #{ + message := MessageTemplate, + topic := KafkaTopic, + sync_query_timeout := SyncQueryTimeout + } = KafkaConfig, KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)), KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])), KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none), @@ -150,7 +154,7 @@ create_producers_for_bridge_v2( _ -> string:equal(TestIdStart, InstId) end, - ok = check_topic_status(Hosts, KafkaConfig, KafkaTopic), + ok = check_topic_status(Hosts, ClientConfig, KafkaTopic), ok = check_if_healthy_leaders(ClientId, KafkaTopic), WolffProducerConfig = producers_config( BridgeType, BridgeName, ClientId, KafkaConfig, IsDryRun, BridgeV2Id @@ -488,15 +492,16 @@ on_get_channel_status( #{ client_id := ClientId, hosts := Hosts, + client_config := ClientConfig, installed_bridge_v2s := Channels } = _State ) -> - ChannelState = maps:get(ChannelId, Channels), + #{kafka_topic := KafkaTopic} = maps:get(ChannelId, Channels), case wolff_client_sup:find_client(ClientId) of {ok, Pid} -> case wolff_client:check_connectivity(Pid) of ok -> - try check_leaders_and_topic(Pid, Hosts, ChannelState) of + try check_leaders_and_topic(ClientId, Pid, Hosts, ClientConfig, KafkaTopic) of ok -> connected catch @@ -511,19 +516,31 @@ on_get_channel_status( end. check_leaders_and_topic( - Client, + ClientId, + ClientPid, Hosts, - #{ - kafka_config := KafkaConfig, - kafka_topic := KafkaTopic - } = _ChannelState + ClientConfig, + KafkaTopic ) -> - check_if_healthy_leaders(Client, KafkaTopic), - check_topic_status(Hosts, KafkaConfig, KafkaTopic). + check_topic_status(Hosts, ClientConfig, KafkaTopic), + do_check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic). -check_if_healthy_leaders(Client, KafkaTopic) when is_pid(Client) -> +check_if_healthy_leaders(ClientId, KafkaTopic) when is_binary(ClientId) -> + case wolff_client_sup:find_client(ClientId) of + {ok, Pid} -> + do_check_if_healthy_leaders(ClientId, Pid, KafkaTopic); + {error, Reason} -> + throw(#{ + error => cannot_find_kafka_client, + reason => Reason, + kafka_client => ClientId, + kafka_topic => KafkaTopic + }) + end. + +do_check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic) when is_pid(ClientPid) -> Leaders = - case wolff_client:get_leader_connections(Client, KafkaTopic) of + case wolff_client:get_leader_connections(ClientPid, KafkaTopic) of {ok, LeadersToCheck} -> %% Kafka is considered healthy as long as any of the partition leader is reachable. lists:filtermap( @@ -540,50 +557,34 @@ check_if_healthy_leaders(Client, KafkaTopic) when is_pid(Client) -> end, case Leaders of [] -> - throw( - iolist_to_binary( - io_lib:format("Could not find any healthy partion leader for topic ~s", [ - KafkaTopic - ]) - ) - ); + throw(#{ + error => no_connected_partition_leader, + kafka_client => ClientId, + kafka_topic => KafkaTopic + }); _ -> ok - end; -check_if_healthy_leaders(ClientId, KafkaTopic) -> - case wolff_client_sup:find_client(ClientId) of - {ok, Pid} -> - check_if_healthy_leaders(Pid, KafkaTopic); - {error, _Reason} -> - throw(iolist_to_binary(io_lib:format("Could not find Kafka client: ~p", [ClientId]))) end. -check_topic_status(Hosts, KafkaConfig, KafkaTopic) -> - CheckTopicFun = - fun() -> - wolff_client:check_if_topic_exists(Hosts, KafkaConfig, KafkaTopic) - end, - try - case emqx_utils:nolink_apply(CheckTopicFun, 5_000) of - ok -> - ok; - {error, unknown_topic_or_partition} -> - throw( - iolist_to_binary(io_lib:format("Unknown topic or partition ~s", [KafkaTopic])) - ); - _ -> - ok - end - catch - error:_:_ -> - %% Some other error not related to unknown_topic_or_partition - ok +check_topic_status(Hosts, ClientConfig, KafkaTopic) -> + %% TODO: change to call wolff:check_if_topic_exists when type spec is fixed for this function + case wolff_client:check_if_topic_exists(Hosts, ClientConfig#{nolink => true}, KafkaTopic) of + ok -> + ok; + {error, unknown_topic_or_partition} -> + throw(#{error => unknown_kafka_topic, topic => KafkaTopic}); + {error, Reason} -> + throw(#{ + error => failed_to_check_topic_status, + reason => Reason, + kafka_topic => KafkaTopic + }) end. ssl(#{enable := true} = SSL) -> emqx_tls_lib:to_client_opts(SSL); ssl(_) -> - []. + false. producers_config(BridgeType, BridgeName, ClientId, Input, IsDryRun, BridgeV2Id) -> #{ diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index 30e8f90a1..36609f16a 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -168,17 +168,17 @@ t_publish_no_auth(CtConfig) -> t_publish_no_auth_key_dispatch(CtConfig) -> publish_with_and_without_ssl(CtConfig, "none", #{"partition_strategy" => "key_dispatch"}). -% t_publish_sasl_plain(CtConfig) -> -% publish_with_and_without_ssl(CtConfig, valid_sasl_plain_settings()). +t_publish_sasl_plain(CtConfig) -> + publish_with_and_without_ssl(CtConfig, valid_sasl_plain_settings()). -% t_publish_sasl_scram256(CtConfig) -> -% publish_with_and_without_ssl(CtConfig, valid_sasl_scram256_settings()). +t_publish_sasl_scram256(CtConfig) -> + publish_with_and_without_ssl(CtConfig, valid_sasl_scram256_settings()). -% t_publish_sasl_scram512(CtConfig) -> -% publish_with_and_without_ssl(CtConfig, valid_sasl_scram512_settings()). +t_publish_sasl_scram512(CtConfig) -> + publish_with_and_without_ssl(CtConfig, valid_sasl_scram512_settings()). -% t_publish_sasl_kerberos(CtConfig) -> -% publish_with_and_without_ssl(CtConfig, valid_sasl_kerberos_settings()). +t_publish_sasl_kerberos(CtConfig) -> + publish_with_and_without_ssl(CtConfig, valid_sasl_kerberos_settings()). %%------------------------------------------------------------------------------ %% Test cases for REST api @@ -187,20 +187,21 @@ t_publish_no_auth_key_dispatch(CtConfig) -> t_kafka_bridge_rest_api_plain_text(_CtConfig) -> kafka_bridge_rest_api_all_auth_methods(false). -% t_kafka_bridge_rest_api_ssl(_CtConfig) -> -% kafka_bridge_rest_api_all_auth_methods(true). +t_kafka_bridge_rest_api_ssl(_CtConfig) -> + kafka_bridge_rest_api_all_auth_methods(true). kafka_bridge_rest_api_all_auth_methods(UseSSL) -> + emqx_logger:set_log_level(debug), NormalHostsString = case UseSSL of true -> kafka_hosts_string_ssl(); false -> kafka_hosts_string() end, - % SASLHostsString = - % case UseSSL of - % true -> kafka_hosts_string_ssl_sasl(); - % false -> kafka_hosts_string_sasl() - % end, + SASLHostsString = + case UseSSL of + true -> kafka_hosts_string_ssl_sasl(); + false -> kafka_hosts_string_sasl() + end, BinifyMap = fun(Map) -> maps:from_list([ {erlang:iolist_to_binary(K), erlang:iolist_to_binary(V)} @@ -210,7 +211,7 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) -> SSLSettings = case UseSSL of true -> #{<<"ssl">> => BinifyMap(valid_ssl_settings())}; - false -> #{} + false -> #{<<"ssl">> => BinifyMap(#{"enable" => "false"})} end, kafka_bridge_rest_api_helper( maps:merge( @@ -221,42 +222,42 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) -> SSLSettings ) ), - % kafka_bridge_rest_api_helper( - % maps:merge( - % #{ - % <<"bootstrap_hosts">> => SASLHostsString, - % <<"authentication">> => BinifyMap(valid_sasl_plain_settings()) - % }, - % SSLSettings - % ) - % ), - % kafka_bridge_rest_api_helper( - % maps:merge( - % #{ - % <<"bootstrap_hosts">> => SASLHostsString, - % <<"authentication">> => BinifyMap(valid_sasl_scram256_settings()) - % }, - % SSLSettings - % ) - % ), - % kafka_bridge_rest_api_helper( - % maps:merge( - % #{ - % <<"bootstrap_hosts">> => SASLHostsString, - % <<"authentication">> => BinifyMap(valid_sasl_scram512_settings()) - % }, - % SSLSettings - % ) - % ), - % kafka_bridge_rest_api_helper( - % maps:merge( - % #{ - % <<"bootstrap_hosts">> => SASLHostsString, - % <<"authentication">> => BinifyMap(valid_sasl_kerberos_settings()) - % }, - % SSLSettings - % ) - % ), + kafka_bridge_rest_api_helper( + maps:merge( + #{ + <<"bootstrap_hosts">> => SASLHostsString, + <<"authentication">> => BinifyMap(valid_sasl_plain_settings()) + }, + SSLSettings + ) + ), + kafka_bridge_rest_api_helper( + maps:merge( + #{ + <<"bootstrap_hosts">> => SASLHostsString, + <<"authentication">> => BinifyMap(valid_sasl_scram256_settings()) + }, + SSLSettings + ) + ), + kafka_bridge_rest_api_helper( + maps:merge( + #{ + <<"bootstrap_hosts">> => SASLHostsString, + <<"authentication">> => BinifyMap(valid_sasl_scram512_settings()) + }, + SSLSettings + ) + ), + kafka_bridge_rest_api_helper( + maps:merge( + #{ + <<"bootstrap_hosts">> => SASLHostsString, + <<"authentication">> => BinifyMap(valid_sasl_kerberos_settings()) + }, + SSLSettings + ) + ), ok. %% So that we can check if new atoms are created when they are not supposed to be created @@ -328,11 +329,7 @@ kafka_bridge_rest_api_helper(Config) -> } } }, - CreateBody = - case maps:is_key(<<"ssl">>, Config) of - true -> CreateBodyTmp#{<<"ssl">> => maps:get(<<"ssl">>, Config)}; - false -> CreateBodyTmp - end, + CreateBody = CreateBodyTmp#{<<"ssl">> => maps:get(<<"ssl">>, Config)}, {ok, 201, _Data} = http_post(BridgesParts, CreateBody), %% Check that the new bridge is in the list of bridges true = MyKafkaBridgeExists(),