Merge pull request #11516 from paulozulato/fix-kafka-aeh-client-unique-id

Fix Kafka/AEH ClientId uniqueness
This commit is contained in:
Paulo Zulato 2023-08-25 10:14:44 -03:00 committed by GitHub
commit cb1e105e19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 50 additions and 15 deletions

View File

@ -12,6 +12,7 @@
-define(BRIDGE_TYPE, azure_event_hub_producer).
-define(BRIDGE_TYPE_BIN, <<"azure_event_hub_producer">>).
-define(KAFKA_BRIDGE_TYPE, kafka).
-define(APPS, [emqx_resource, emqx_bridge, emqx_rule_engine]).
-import(emqx_common_test_helpers, [on_exit/1]).
@ -276,3 +277,38 @@ t_sync_query(Config) ->
emqx_bridge_kafka_impl_producer_sync_query
),
ok.
t_same_name_azure_kafka_bridges(AehConfig) ->
ConfigKafka = lists:keyreplace(bridge_type, 1, AehConfig, {bridge_type, ?KAFKA_BRIDGE_TYPE}),
BridgeName = ?config(bridge_name, AehConfig),
AehResourceId = emqx_bridge_testlib:resource_id(AehConfig),
TracePoint = emqx_bridge_kafka_impl_producer_sync_query,
%% creates the AEH bridge and check it's working
ok = emqx_bridge_testlib:t_sync_query(
AehConfig,
fun make_message/0,
fun(Res) -> ?assertEqual(ok, Res) end,
TracePoint
),
%% than creates a Kafka bridge with same name and delete it after creation
ok = emqx_bridge_testlib:t_create_via_http(ConfigKafka),
?assertMatch(
{{ok, _}, {ok, _}},
?wait_async_action(
emqx_bridge:disable_enable(disable, ?KAFKA_BRIDGE_TYPE, BridgeName),
#{?snk_kind := kafka_producer_stopped},
5_000
)
),
% check that AEH bridge is still working
?check_trace(
begin
Message = {send_message, make_message()},
?assertEqual(ok, emqx_resource:simple_sync_query(AehResourceId, Message)),
ok
end,
fun(Trace) ->
?assertMatch([#{instance_id := AehResourceId}], ?of_kind(TracePoint, Trace))
end
),
ok.

View File

@ -7,7 +7,7 @@
-export([
hosts/1,
make_client_id/2,
make_client_id/1,
sasl/1,
socket_opts/1
]).
@ -24,10 +24,11 @@ hosts(Hosts) when is_list(Hosts) ->
kpro:parse_endpoints(Hosts).
%% Client ID is better to be unique to make it easier for Kafka side trouble shooting.
make_client_id(KafkaType0, BridgeName0) ->
KafkaType = to_bin(KafkaType0),
BridgeName = to_bin(BridgeName0),
iolist_to_binary([KafkaType, ":", BridgeName, ":", atom_to_list(node())]).
make_client_id(InstanceId) ->
InstanceIdBin0 = to_bin(InstanceId),
% Removing the <<"bridge:">> from beginning for backward compatibility
InstanceIdBin = binary:replace(InstanceIdBin0, <<"bridge:">>, <<>>),
iolist_to_binary([InstanceIdBin, ":", atom_to_list(node())]).
sasl(none) ->
undefined;

View File

@ -121,7 +121,6 @@ on_start(ResourceId, Config) ->
#{
authentication := Auth,
bootstrap_hosts := BootstrapHosts0,
bridge_name := BridgeName,
hookpoint := _,
kafka := #{
max_batch_bytes := _,
@ -134,9 +133,8 @@ on_start(ResourceId, Config) ->
topic_mapping := _
} = Config,
BootstrapHosts = emqx_bridge_kafka_impl:hosts(BootstrapHosts0),
KafkaType = kafka_consumer,
%% Note: this is distinct per node.
ClientID = make_client_id(ResourceId, KafkaType, BridgeName),
ClientID = make_client_id(ResourceId),
ClientOpts0 =
case Auth of
none -> [];
@ -517,11 +515,11 @@ is_dry_run(ResourceId) ->
string:equal(TestIdStart, ResourceId)
end.
-spec make_client_id(resource_id(), kafka_consumer, atom() | binary()) -> atom().
make_client_id(ResourceId, KafkaType, KafkaName) ->
case is_dry_run(ResourceId) of
-spec make_client_id(resource_id()) -> atom().
make_client_id(InstanceId) ->
case is_dry_run(InstanceId) of
false ->
ClientID0 = emqx_bridge_kafka_impl:make_client_id(KafkaType, KafkaName),
ClientID0 = emqx_bridge_kafka_impl:make_client_id(InstanceId),
binary_to_atom(ClientID0);
true ->
%% It is a dry run and we don't want to leak too many

View File

@ -65,7 +65,7 @@ on_start(InstId, Config) ->
ok = emqx_resource:allocate_resource(InstId, ?kafka_resource_id, ResourceId),
_ = maybe_install_wolff_telemetry_handlers(ResourceId),
Hosts = emqx_bridge_kafka_impl:hosts(Hosts0),
ClientId = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName),
ClientId = emqx_bridge_kafka_impl:make_client_id(InstId),
ok = emqx_resource:allocate_resource(InstId, ?kafka_client_id, ClientId),
ClientConfig = #{
min_metadata_refresh_interval => MinMetaRefreshInterval,

View File

@ -898,8 +898,8 @@ ensure_connected(Config) ->
ok.
consumer_clientid(Config) ->
KafkaName = ?config(kafka_name, Config),
binary_to_atom(emqx_bridge_kafka_impl:make_client_id(kafka_consumer, KafkaName)).
ResourceId = resource_id(Config),
binary_to_atom(emqx_bridge_kafka_impl:make_client_id(ResourceId)).
get_client_connection(Config) ->
KafkaHost = ?config(kafka_host, Config),