fix(kafka): avoid producer name collision between Kafka and AEH bridges

Fixes https://emqx.atlassian.net/browse/EMQX-10860
This commit is contained in:
Paulo Zulato 2023-08-29 19:26:58 -03:00
parent 1cab687153
commit ee77976424
8 changed files with 49 additions and 57 deletions

View File

@ -175,14 +175,15 @@ create(BridgeId, Conf) ->
create(Type, Name, Conf) ->
create(Type, Name, Conf, #{}).
create(Type, Name, Conf, Opts) ->
create(Type, Name, Conf0, Opts) ->
?SLOG(info, #{
msg => "create bridge",
type => Type,
name => Name,
config => emqx_utils:redact(Conf)
config => emqx_utils:redact(Conf0)
}),
TypeBin = bin(Type),
Conf = Conf0#{bridge_type => TypeBin, bridge_name => Name},
{ok, _Data} = emqx_resource:create_local(
resource_id(Type, Name),
<<"emqx_bridge">>,
@ -249,8 +250,9 @@ recreate(Type, Name) ->
recreate(Type, Name, Conf) ->
recreate(Type, Name, Conf, #{}).
recreate(Type, Name, Conf, Opts) ->
recreate(Type, Name, Conf0, Opts) ->
TypeBin = bin(Type),
Conf = Conf0#{bridge_type => TypeBin, bridge_name => Name},
emqx_resource:recreate_local(
resource_id(Type, Name),
bridge_to_resource_type(Type),
@ -267,17 +269,18 @@ create_dry_run(Type, Conf0) ->
Conf1 = maps:without([<<"name">>], Conf0),
RawConf = #{<<"bridges">> => #{TypeBin => #{<<"temp_name">> => Conf1}}},
try
#{bridges := #{TypeAtom := #{temp_name := Conf}}} =
#{bridges := #{TypeAtom := #{temp_name := Conf2}}} =
hocon_tconf:check_plain(
emqx_bridge_schema,
RawConf,
#{atom_key => true, required => false}
),
Conf = Conf2#{bridge_type => TypeBin, bridge_name => TmpName},
case emqx_connector_ssl:convert_certs(TmpPath, Conf) of
{error, Reason} ->
{error, Reason};
{ok, ConfNew} ->
ParseConf = parse_confs(bin(Type), TmpName, ConfNew),
ParseConf = parse_confs(TypeBin, TmpName, ConfNew),
emqx_resource:create_dry_run_local(bridge_to_resource_type(Type), ParseConf)
end
catch
@ -387,15 +390,7 @@ parse_confs(Type, Name, Conf) when ?IS_INGRESS_BRIDGE(Type) ->
%% receives a message from the external database.
BId = bridge_id(Type, Name),
BridgeHookpoint = bridge_hookpoint(BId),
Conf#{hookpoint => BridgeHookpoint, bridge_name => Name};
%% TODO: rename this to `kafka_producer' after alias support is added
%% to hocon; keeping this as just `kafka' for backwards compatibility.
parse_confs(<<"kafka">> = _Type, Name, Conf) ->
Conf#{bridge_name => Name};
parse_confs(<<"pulsar_producer">> = _Type, Name, Conf) ->
Conf#{bridge_name => Name};
parse_confs(<<"kinesis_producer">> = _Type, Name, Conf) ->
Conf#{bridge_name => Name};
Conf#{hookpoint => BridgeHookpoint};
parse_confs(BridgeType, BridgeName, Config) ->
connector_config(BridgeType, BridgeName, Config).

View File

@ -166,14 +166,14 @@ values(producer) ->
%% `emqx_bridge_resource' API
%%-------------------------------------------------------------------------------------------------
connector_config(Config, BridgeName) ->
connector_config(Config, _BridgeName) ->
%% Default port for AEH is 9093
BootstrapHosts0 = maps:get(bootstrap_hosts, Config),
BootstrapHosts = emqx_schema:parse_servers(
BootstrapHosts0,
emqx_bridge_azure_event_hub:host_opts()
),
Config#{bridge_name => BridgeName, bootstrap_hosts := BootstrapHosts}.
Config#{bootstrap_hosts := BootstrapHosts}.
%%-------------------------------------------------------------------------------------------------
%% Internal fns

View File

@ -282,6 +282,7 @@ t_same_name_azure_kafka_bridges(AehConfig) ->
ConfigKafka = lists:keyreplace(bridge_type, 1, AehConfig, {bridge_type, ?KAFKA_BRIDGE_TYPE}),
BridgeName = ?config(bridge_name, AehConfig),
AehResourceId = emqx_bridge_testlib:resource_id(AehConfig),
KafkaResourceId = emqx_bridge_testlib:resource_id(ConfigKafka),
TracePoint = emqx_bridge_kafka_impl_producer_sync_query,
%% creates the AEH bridge and check it's working
ok = emqx_bridge_testlib:t_sync_query(
@ -292,6 +293,9 @@ t_same_name_azure_kafka_bridges(AehConfig) ->
),
%% than creates a Kafka bridge with same name and delete it after creation
ok = emqx_bridge_testlib:t_create_via_http(ConfigKafka),
%% check that both bridges are healthy
?assertEqual({ok, connected}, emqx_resource_manager:health_check(AehResourceId)),
?assertEqual({ok, connected}, emqx_resource_manager:health_check(KafkaResourceId)),
?assertMatch(
{{ok, _}, {ok, _}},
?wait_async_action(

View File

@ -7,7 +7,7 @@
-export([
hosts/1,
make_client_id/1,
make_client_id/2,
sasl/1,
socket_opts/1
]).
@ -24,11 +24,10 @@ hosts(Hosts) when is_list(Hosts) ->
kpro:parse_endpoints(Hosts).
%% Client ID is better to be unique to make it easier for Kafka side trouble shooting.
make_client_id(InstanceId) ->
InstanceIdBin0 = to_bin(InstanceId),
% Removing the <<"bridge:">> from beginning for backward compatibility
InstanceIdBin = binary:replace(InstanceIdBin0, <<"bridge:">>, <<>>),
iolist_to_binary([InstanceIdBin, ":", atom_to_list(node())]).
make_client_id(BridgeType0, BridgeName0) ->
BridgeType = to_bin(BridgeType0),
BridgeName = to_bin(BridgeName0),
iolist_to_binary([BridgeType, ":", BridgeName, ":", atom_to_list(node())]).
sasl(none) ->
undefined;

View File

@ -121,6 +121,8 @@ on_start(ResourceId, Config) ->
#{
authentication := Auth,
bootstrap_hosts := BootstrapHosts0,
bridge_type := BridgeType,
bridge_name := BridgeName,
hookpoint := _,
kafka := #{
max_batch_bytes := _,
@ -134,7 +136,7 @@ on_start(ResourceId, Config) ->
} = Config,
BootstrapHosts = emqx_bridge_kafka_impl:hosts(BootstrapHosts0),
%% Note: this is distinct per node.
ClientID = make_client_id(ResourceId),
ClientID = make_client_id(ResourceId, BridgeType, BridgeName),
ClientOpts0 =
case Auth of
none -> [];
@ -515,11 +517,11 @@ is_dry_run(ResourceId) ->
string:equal(TestIdStart, ResourceId)
end.
-spec make_client_id(resource_id()) -> atom().
make_client_id(InstanceId) ->
case is_dry_run(InstanceId) of
-spec make_client_id(resource_id(), binary(), atom() | binary()) -> atom().
make_client_id(ResourceId, BridgeType, BridgeName) ->
case is_dry_run(ResourceId) of
false ->
ClientID0 = emqx_bridge_kafka_impl:make_client_id(InstanceId),
ClientID0 = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName),
binary_to_atom(ClientID0);
true ->
%% It is a dry run and we don't want to leak too many

View File

@ -29,10 +29,6 @@
-define(kafka_client_id, kafka_client_id).
-define(kafka_producers, kafka_producers).
%% TODO: rename this to `kafka_producer' after alias support is added
%% to hocon; keeping this as just `kafka' for backwards compatibility.
-define(BRIDGE_TYPE, kafka).
query_mode(#{kafka := #{query_mode := sync}}) ->
simple_sync;
query_mode(_) ->
@ -46,6 +42,7 @@ on_start(InstId, Config) ->
authentication := Auth,
bootstrap_hosts := Hosts0,
bridge_name := BridgeName,
bridge_type := BridgeType,
connect_timeout := ConnTimeout,
kafka := KafkaConfig = #{
message := MessageTemplate,
@ -60,12 +57,11 @@ on_start(InstId, Config) ->
KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)),
KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])),
KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none),
BridgeType = ?BRIDGE_TYPE,
ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
ok = emqx_resource:allocate_resource(InstId, ?kafka_resource_id, ResourceId),
_ = maybe_install_wolff_telemetry_handlers(ResourceId),
Hosts = emqx_bridge_kafka_impl:hosts(Hosts0),
ClientId = emqx_bridge_kafka_impl:make_client_id(InstId),
ClientId = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName),
ok = emqx_resource:allocate_resource(InstId, ?kafka_client_id, ClientId),
ClientConfig = #{
min_metadata_refresh_interval => MinMetaRefreshInterval,
@ -107,7 +103,7 @@ on_start(InstId, Config) ->
_ ->
string:equal(TestIdStart, InstId)
end,
WolffProducerConfig = producers_config(BridgeName, ClientId, KafkaConfig, IsDryRun),
WolffProducerConfig = producers_config(BridgeType, BridgeName, ClientId, KafkaConfig, IsDryRun),
case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of
{ok, Producers} ->
ok = emqx_resource:allocate_resource(InstId, ?kafka_producers, Producers),
@ -462,7 +458,7 @@ ssl(#{enable := true} = SSL) ->
ssl(_) ->
[].
producers_config(BridgeName, ClientId, Input, IsDryRun) ->
producers_config(BridgeType, BridgeName, ClientId, Input, IsDryRun) ->
#{
max_batch_bytes := MaxBatchBytes,
compression := Compression,
@ -488,10 +484,9 @@ producers_config(BridgeName, ClientId, Input, IsDryRun) ->
disk -> {false, replayq_dir(ClientId)};
hybrid -> {true, replayq_dir(ClientId)}
end,
BridgeType = ?BRIDGE_TYPE,
ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
#{
name => make_producer_name(BridgeName, IsDryRun),
name => make_producer_name(BridgeType, BridgeName, IsDryRun),
partitioner => partitioner(PartitionStrategy),
partition_count_refresh_interval_seconds => PCntRefreshInterval,
replayq_dir => ReplayqDir,
@ -516,20 +511,15 @@ replayq_dir(ClientId) ->
%% Producer name must be an atom which will be used as a ETS table name for
%% partition worker lookup.
make_producer_name(BridgeName, IsDryRun) when is_atom(BridgeName) ->
make_producer_name(atom_to_list(BridgeName), IsDryRun);
make_producer_name(BridgeName, IsDryRun) ->
make_producer_name(_BridgeType, _BridgeName, true = _IsDryRun) ->
%% It is a dry run and we don't want to leak too many atoms
%% so we use the default producer name instead of creating
%% an unique name.
probing_wolff_producers;
make_producer_name(BridgeType, BridgeName, _IsDryRun) ->
%% Woff needs an atom for ets table name registration. The assumption here is
%% that bridges with new names are not often created.
case IsDryRun of
true ->
%% It is a dry run and we don't want to leak too many atoms
%% so we use the default producer name instead of creating
%% an unique name.
probing_wolff_producers;
false ->
binary_to_atom(iolist_to_binary(["kafka_producer_", BridgeName]))
end.
binary_to_atom(iolist_to_binary([BridgeType, "_", bin(BridgeName)])).
with_log_at_error(Fun, Log) ->
try

View File

@ -66,7 +66,7 @@ only_once_tests() ->
].
init_per_suite(Config) ->
Config.
[{bridge_type, <<"kafka_consumer">>} | Config].
end_per_suite(_Config) ->
emqx_mgmt_api_test_util:end_suite(),
@ -898,8 +898,9 @@ ensure_connected(Config) ->
ok.
consumer_clientid(Config) ->
ResourceId = resource_id(Config),
binary_to_atom(emqx_bridge_kafka_impl:make_client_id(ResourceId)).
BridgeType = ?config(bridge_type, Config),
KafkaName = ?config(kafka_name, Config),
binary_to_atom(emqx_bridge_kafka_impl:make_client_id(BridgeType, KafkaName)).
get_client_connection(Config) ->
KafkaHost = ?config(kafka_host, Config),

View File

@ -40,6 +40,7 @@
%% TODO: rename this to `kafka_producer' after alias support is added
%% to hocon; keeping this as just `kafka' for backwards compatibility.
-define(BRIDGE_TYPE, "kafka").
-define(BRIDGE_TYPE_BIN, <<"kafka">>).
-define(APPS, [emqx_resource, emqx_bridge, emqx_rule_engine, emqx_bridge_kafka]).
@ -438,7 +439,7 @@ t_failed_creation_then_fix(Config) ->
{ok, #{config := WrongConfigAtom1}} = emqx_bridge:create(
Type, erlang:list_to_atom(Name), WrongConf
),
WrongConfigAtom = WrongConfigAtom1#{bridge_name => Name},
WrongConfigAtom = WrongConfigAtom1#{bridge_name => Name, bridge_type => ?BRIDGE_TYPE_BIN},
?assertThrow(Reason when is_list(Reason), ?PRODUCER:on_start(ResourceId, WrongConfigAtom)),
%% before throwing, it should cleanup the client process. we
%% retry because the supervisor might need some time to really
@ -448,7 +449,7 @@ t_failed_creation_then_fix(Config) ->
{ok, #{config := ValidConfigAtom1}} = emqx_bridge:create(
Type, erlang:list_to_atom(Name), ValidConf
),
ValidConfigAtom = ValidConfigAtom1#{bridge_name => Name},
ValidConfigAtom = ValidConfigAtom1#{bridge_name => Name, bridge_type => ?BRIDGE_TYPE_BIN},
{ok, State} = ?PRODUCER:on_start(ResourceId, ValidConfigAtom),
Time = erlang:unique_integer(),
BinTime = integer_to_binary(Time),
@ -540,7 +541,7 @@ t_nonexistent_topic(_Config) ->
{ok, #{config := ValidConfigAtom1}} = emqx_bridge:create(
Type, erlang:list_to_atom(Name), Conf
),
ValidConfigAtom = ValidConfigAtom1#{bridge_name => Name},
ValidConfigAtom = ValidConfigAtom1#{bridge_name => Name, bridge_type => ?BRIDGE_TYPE_BIN},
?assertThrow(_, ?PRODUCER:on_start(ResourceId, ValidConfigAtom)),
ok = emqx_bridge_resource:remove(BridgeId),
delete_all_bridges(),
@ -585,7 +586,7 @@ t_send_message_with_headers(Config) ->
{ok, #{config := ConfigAtom1}} = emqx_bridge:create(
Type, erlang:list_to_atom(Name), Conf
),
ConfigAtom = ConfigAtom1#{bridge_name => Name},
ConfigAtom = ConfigAtom1#{bridge_name => Name, bridge_type => ?BRIDGE_TYPE_BIN},
{ok, State} = ?PRODUCER:on_start(ResourceId, ConfigAtom),
Time1 = erlang:unique_integer(),
BinTime1 = integer_to_binary(Time1),
@ -807,7 +808,7 @@ t_wrong_headers_from_message(Config) ->
{ok, #{config := ConfigAtom1}} = emqx_bridge:create(
Type, erlang:list_to_atom(Name), Conf
),
ConfigAtom = ConfigAtom1#{bridge_name => Name},
ConfigAtom = ConfigAtom1#{bridge_name => Name, bridge_type => ?BRIDGE_TYPE_BIN},
{ok, State} = ?PRODUCER:on_start(ResourceId, ConfigAtom),
Time1 = erlang:unique_integer(),
Payload1 = <<"wrong_header">>,