fix(kafka): use client config for topic existence check

Prior to this fix, Kafka producer config was used as client config
This commit is contained in:
Zaiming (Stone) Shi 2023-10-27 07:45:26 +02:00
parent f2c9739ce2
commit 5f17a8f2ce
2 changed files with 108 additions and 110 deletions

View File

@ -102,6 +102,7 @@ on_start(InstId, Config) ->
client_id => ClientId,
resource_id => ResourceId,
hosts => Hosts,
client_config => ClientConfig,
installed_bridge_v2s => #{}
}}.
@ -110,6 +111,7 @@ on_add_channel(
#{
client_id := ClientId,
hosts := Hosts,
client_config := ClientConfig,
installed_bridge_v2s := InstalledBridgeV2s
} = OldState,
BridgeV2Id,
@ -117,7 +119,7 @@ on_add_channel(
) ->
%% The following will throw an exception if the bridge producers fails to start
{ok, BridgeV2State} = create_producers_for_bridge_v2(
InstId, BridgeV2Id, ClientId, Hosts, BridgeV2Config
InstId, BridgeV2Id, ClientId, Hosts, ClientConfig, BridgeV2Config
),
NewInstalledBridgeV2s = maps:put(BridgeV2Id, BridgeV2State, InstalledBridgeV2s),
%% Update state
@ -129,15 +131,17 @@ create_producers_for_bridge_v2(
BridgeV2Id,
ClientId,
Hosts,
ClientConfig,
#{
bridge_type := BridgeType,
kafka := #{
message := MessageTemplate,
topic := KafkaTopic,
sync_query_timeout := SyncQueryTimeout
} = KafkaConfig
kafka := KafkaConfig
}
) ->
#{
message := MessageTemplate,
topic := KafkaTopic,
sync_query_timeout := SyncQueryTimeout
} = KafkaConfig,
KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)),
KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])),
KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none),
@ -150,7 +154,7 @@ create_producers_for_bridge_v2(
_ ->
string:equal(TestIdStart, InstId)
end,
ok = check_topic_status(Hosts, KafkaConfig, KafkaTopic),
ok = check_topic_status(Hosts, ClientConfig, KafkaTopic),
ok = check_if_healthy_leaders(ClientId, KafkaTopic),
WolffProducerConfig = producers_config(
BridgeType, BridgeName, ClientId, KafkaConfig, IsDryRun, BridgeV2Id
@ -488,15 +492,16 @@ on_get_channel_status(
#{
client_id := ClientId,
hosts := Hosts,
client_config := ClientConfig,
installed_bridge_v2s := Channels
} = _State
) ->
ChannelState = maps:get(ChannelId, Channels),
#{kafka_topic := KafkaTopic} = maps:get(ChannelId, Channels),
case wolff_client_sup:find_client(ClientId) of
{ok, Pid} ->
case wolff_client:check_connectivity(Pid) of
ok ->
try check_leaders_and_topic(Pid, Hosts, ChannelState) of
try check_leaders_and_topic(ClientId, Pid, Hosts, ClientConfig, KafkaTopic) of
ok ->
connected
catch
@ -511,19 +516,31 @@ on_get_channel_status(
end.
check_leaders_and_topic(
Client,
ClientId,
ClientPid,
Hosts,
#{
kafka_config := KafkaConfig,
kafka_topic := KafkaTopic
} = _ChannelState
ClientConfig,
KafkaTopic
) ->
check_if_healthy_leaders(Client, KafkaTopic),
check_topic_status(Hosts, KafkaConfig, KafkaTopic).
check_topic_status(Hosts, ClientConfig, KafkaTopic),
do_check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic).
check_if_healthy_leaders(Client, KafkaTopic) when is_pid(Client) ->
check_if_healthy_leaders(ClientId, KafkaTopic) when is_binary(ClientId) ->
case wolff_client_sup:find_client(ClientId) of
{ok, Pid} ->
do_check_if_healthy_leaders(ClientId, Pid, KafkaTopic);
{error, Reason} ->
throw(#{
error => cannot_find_kafka_client,
reason => Reason,
kafka_client => ClientId,
kafka_topic => KafkaTopic
})
end.
do_check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic) when is_pid(ClientPid) ->
Leaders =
case wolff_client:get_leader_connections(Client, KafkaTopic) of
case wolff_client:get_leader_connections(ClientPid, KafkaTopic) of
{ok, LeadersToCheck} ->
%% Kafka is considered healthy as long as any of the partition leader is reachable.
lists:filtermap(
@ -540,50 +557,34 @@ check_if_healthy_leaders(Client, KafkaTopic) when is_pid(Client) ->
end,
case Leaders of
[] ->
throw(
iolist_to_binary(
io_lib:format("Could not find any healthy partion leader for topic ~s", [
KafkaTopic
])
)
);
throw(#{
error => no_connected_partition_leader,
kafka_client => ClientId,
kafka_topic => KafkaTopic
});
_ ->
ok
end;
check_if_healthy_leaders(ClientId, KafkaTopic) ->
case wolff_client_sup:find_client(ClientId) of
{ok, Pid} ->
check_if_healthy_leaders(Pid, KafkaTopic);
{error, _Reason} ->
throw(iolist_to_binary(io_lib:format("Could not find Kafka client: ~p", [ClientId])))
end.
check_topic_status(Hosts, KafkaConfig, KafkaTopic) ->
CheckTopicFun =
fun() ->
wolff_client:check_if_topic_exists(Hosts, KafkaConfig, KafkaTopic)
end,
try
case emqx_utils:nolink_apply(CheckTopicFun, 5_000) of
ok ->
ok;
{error, unknown_topic_or_partition} ->
throw(
iolist_to_binary(io_lib:format("Unknown topic or partition ~s", [KafkaTopic]))
);
_ ->
ok
end
catch
error:_:_ ->
%% Some other error not related to unknown_topic_or_partition
ok
check_topic_status(Hosts, ClientConfig, KafkaTopic) ->
%% TODO: change to call wolff:check_if_topic_exists when type spec is fixed for this function
case wolff_client:check_if_topic_exists(Hosts, ClientConfig#{nolink => true}, KafkaTopic) of
ok ->
ok;
{error, unknown_topic_or_partition} ->
throw(#{error => unknown_kafka_topic, topic => KafkaTopic});
{error, Reason} ->
throw(#{
error => failed_to_check_topic_status,
reason => Reason,
kafka_topic => KafkaTopic
})
end.
ssl(#{enable := true} = SSL) ->
emqx_tls_lib:to_client_opts(SSL);
ssl(_) ->
[].
false.
producers_config(BridgeType, BridgeName, ClientId, Input, IsDryRun, BridgeV2Id) ->
#{

View File

@ -168,17 +168,17 @@ t_publish_no_auth(CtConfig) ->
t_publish_no_auth_key_dispatch(CtConfig) ->
publish_with_and_without_ssl(CtConfig, "none", #{"partition_strategy" => "key_dispatch"}).
% t_publish_sasl_plain(CtConfig) ->
% publish_with_and_without_ssl(CtConfig, valid_sasl_plain_settings()).
t_publish_sasl_plain(CtConfig) ->
publish_with_and_without_ssl(CtConfig, valid_sasl_plain_settings()).
% t_publish_sasl_scram256(CtConfig) ->
% publish_with_and_without_ssl(CtConfig, valid_sasl_scram256_settings()).
t_publish_sasl_scram256(CtConfig) ->
publish_with_and_without_ssl(CtConfig, valid_sasl_scram256_settings()).
% t_publish_sasl_scram512(CtConfig) ->
% publish_with_and_without_ssl(CtConfig, valid_sasl_scram512_settings()).
t_publish_sasl_scram512(CtConfig) ->
publish_with_and_without_ssl(CtConfig, valid_sasl_scram512_settings()).
% t_publish_sasl_kerberos(CtConfig) ->
% publish_with_and_without_ssl(CtConfig, valid_sasl_kerberos_settings()).
t_publish_sasl_kerberos(CtConfig) ->
publish_with_and_without_ssl(CtConfig, valid_sasl_kerberos_settings()).
%%------------------------------------------------------------------------------
%% Test cases for REST api
@ -187,20 +187,21 @@ t_publish_no_auth_key_dispatch(CtConfig) ->
t_kafka_bridge_rest_api_plain_text(_CtConfig) ->
kafka_bridge_rest_api_all_auth_methods(false).
% t_kafka_bridge_rest_api_ssl(_CtConfig) ->
% kafka_bridge_rest_api_all_auth_methods(true).
t_kafka_bridge_rest_api_ssl(_CtConfig) ->
kafka_bridge_rest_api_all_auth_methods(true).
kafka_bridge_rest_api_all_auth_methods(UseSSL) ->
emqx_logger:set_log_level(debug),
NormalHostsString =
case UseSSL of
true -> kafka_hosts_string_ssl();
false -> kafka_hosts_string()
end,
% SASLHostsString =
% case UseSSL of
% true -> kafka_hosts_string_ssl_sasl();
% false -> kafka_hosts_string_sasl()
% end,
SASLHostsString =
case UseSSL of
true -> kafka_hosts_string_ssl_sasl();
false -> kafka_hosts_string_sasl()
end,
BinifyMap = fun(Map) ->
maps:from_list([
{erlang:iolist_to_binary(K), erlang:iolist_to_binary(V)}
@ -210,7 +211,7 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) ->
SSLSettings =
case UseSSL of
true -> #{<<"ssl">> => BinifyMap(valid_ssl_settings())};
false -> #{}
false -> #{<<"ssl">> => BinifyMap(#{"enable" => "false"})}
end,
kafka_bridge_rest_api_helper(
maps:merge(
@ -221,42 +222,42 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) ->
SSLSettings
)
),
% kafka_bridge_rest_api_helper(
% maps:merge(
% #{
% <<"bootstrap_hosts">> => SASLHostsString,
% <<"authentication">> => BinifyMap(valid_sasl_plain_settings())
% },
% SSLSettings
% )
% ),
% kafka_bridge_rest_api_helper(
% maps:merge(
% #{
% <<"bootstrap_hosts">> => SASLHostsString,
% <<"authentication">> => BinifyMap(valid_sasl_scram256_settings())
% },
% SSLSettings
% )
% ),
% kafka_bridge_rest_api_helper(
% maps:merge(
% #{
% <<"bootstrap_hosts">> => SASLHostsString,
% <<"authentication">> => BinifyMap(valid_sasl_scram512_settings())
% },
% SSLSettings
% )
% ),
% kafka_bridge_rest_api_helper(
% maps:merge(
% #{
% <<"bootstrap_hosts">> => SASLHostsString,
% <<"authentication">> => BinifyMap(valid_sasl_kerberos_settings())
% },
% SSLSettings
% )
% ),
kafka_bridge_rest_api_helper(
maps:merge(
#{
<<"bootstrap_hosts">> => SASLHostsString,
<<"authentication">> => BinifyMap(valid_sasl_plain_settings())
},
SSLSettings
)
),
kafka_bridge_rest_api_helper(
maps:merge(
#{
<<"bootstrap_hosts">> => SASLHostsString,
<<"authentication">> => BinifyMap(valid_sasl_scram256_settings())
},
SSLSettings
)
),
kafka_bridge_rest_api_helper(
maps:merge(
#{
<<"bootstrap_hosts">> => SASLHostsString,
<<"authentication">> => BinifyMap(valid_sasl_scram512_settings())
},
SSLSettings
)
),
kafka_bridge_rest_api_helper(
maps:merge(
#{
<<"bootstrap_hosts">> => SASLHostsString,
<<"authentication">> => BinifyMap(valid_sasl_kerberos_settings())
},
SSLSettings
)
),
ok.
%% So that we can check if new atoms are created when they are not supposed to be created
@ -328,11 +329,7 @@ kafka_bridge_rest_api_helper(Config) ->
}
}
},
CreateBody =
case maps:is_key(<<"ssl">>, Config) of
true -> CreateBodyTmp#{<<"ssl">> => maps:get(<<"ssl">>, Config)};
false -> CreateBodyTmp
end,
CreateBody = CreateBodyTmp#{<<"ssl">> => maps:get(<<"ssl">>, Config)},
{ok, 201, _Data} = http_post(BridgesParts, CreateBody),
%% Check that the new bridge is in the list of bridges
true = MyKafkaBridgeExists(),