Merge pull request #11546 from paulozulato/fix-kafka-producer-unique-name
fix(kafka): avoid producer name collision between Kafka and AEH bridges
This commit is contained in:
commit
727fd296ee
|
@ -175,14 +175,15 @@ create(BridgeId, Conf) ->
|
|||
create(Type, Name, Conf) ->
|
||||
create(Type, Name, Conf, #{}).
|
||||
|
||||
create(Type, Name, Conf, Opts) ->
|
||||
create(Type, Name, Conf0, Opts) ->
|
||||
?SLOG(info, #{
|
||||
msg => "create bridge",
|
||||
type => Type,
|
||||
name => Name,
|
||||
config => emqx_utils:redact(Conf)
|
||||
config => emqx_utils:redact(Conf0)
|
||||
}),
|
||||
TypeBin = bin(Type),
|
||||
Conf = Conf0#{bridge_type => TypeBin, bridge_name => Name},
|
||||
{ok, _Data} = emqx_resource:create_local(
|
||||
resource_id(Type, Name),
|
||||
<<"emqx_bridge">>,
|
||||
|
@ -249,8 +250,9 @@ recreate(Type, Name) ->
|
|||
recreate(Type, Name, Conf) ->
|
||||
recreate(Type, Name, Conf, #{}).
|
||||
|
||||
recreate(Type, Name, Conf, Opts) ->
|
||||
recreate(Type, Name, Conf0, Opts) ->
|
||||
TypeBin = bin(Type),
|
||||
Conf = Conf0#{bridge_type => TypeBin, bridge_name => Name},
|
||||
emqx_resource:recreate_local(
|
||||
resource_id(Type, Name),
|
||||
bridge_to_resource_type(Type),
|
||||
|
@ -267,17 +269,18 @@ create_dry_run(Type, Conf0) ->
|
|||
Conf1 = maps:without([<<"name">>], Conf0),
|
||||
RawConf = #{<<"bridges">> => #{TypeBin => #{<<"temp_name">> => Conf1}}},
|
||||
try
|
||||
#{bridges := #{TypeAtom := #{temp_name := Conf}}} =
|
||||
#{bridges := #{TypeAtom := #{temp_name := Conf2}}} =
|
||||
hocon_tconf:check_plain(
|
||||
emqx_bridge_schema,
|
||||
RawConf,
|
||||
#{atom_key => true, required => false}
|
||||
),
|
||||
Conf = Conf2#{bridge_type => TypeBin, bridge_name => TmpName},
|
||||
case emqx_connector_ssl:convert_certs(TmpPath, Conf) of
|
||||
{error, Reason} ->
|
||||
{error, Reason};
|
||||
{ok, ConfNew} ->
|
||||
ParseConf = parse_confs(bin(Type), TmpName, ConfNew),
|
||||
ParseConf = parse_confs(TypeBin, TmpName, ConfNew),
|
||||
emqx_resource:create_dry_run_local(bridge_to_resource_type(Type), ParseConf)
|
||||
end
|
||||
catch
|
||||
|
@ -387,15 +390,7 @@ parse_confs(Type, Name, Conf) when ?IS_INGRESS_BRIDGE(Type) ->
|
|||
%% receives a message from the external database.
|
||||
BId = bridge_id(Type, Name),
|
||||
BridgeHookpoint = bridge_hookpoint(BId),
|
||||
Conf#{hookpoint => BridgeHookpoint, bridge_name => Name};
|
||||
%% TODO: rename this to `kafka_producer' after alias support is added
|
||||
%% to hocon; keeping this as just `kafka' for backwards compatibility.
|
||||
parse_confs(<<"kafka">> = _Type, Name, Conf) ->
|
||||
Conf#{bridge_name => Name};
|
||||
parse_confs(<<"pulsar_producer">> = _Type, Name, Conf) ->
|
||||
Conf#{bridge_name => Name};
|
||||
parse_confs(<<"kinesis_producer">> = _Type, Name, Conf) ->
|
||||
Conf#{bridge_name => Name};
|
||||
Conf#{hookpoint => BridgeHookpoint};
|
||||
parse_confs(BridgeType, BridgeName, Config) ->
|
||||
connector_config(BridgeType, BridgeName, Config).
|
||||
|
||||
|
|
|
@ -166,14 +166,14 @@ values(producer) ->
|
|||
%% `emqx_bridge_resource' API
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
|
||||
connector_config(Config, BridgeName) ->
|
||||
connector_config(Config, _BridgeName) ->
|
||||
%% Default port for AEH is 9093
|
||||
BootstrapHosts0 = maps:get(bootstrap_hosts, Config),
|
||||
BootstrapHosts = emqx_schema:parse_servers(
|
||||
BootstrapHosts0,
|
||||
emqx_bridge_azure_event_hub:host_opts()
|
||||
),
|
||||
Config#{bridge_name => BridgeName, bootstrap_hosts := BootstrapHosts}.
|
||||
Config#{bootstrap_hosts := BootstrapHosts}.
|
||||
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
%% Internal fns
|
||||
|
|
|
@ -282,6 +282,7 @@ t_same_name_azure_kafka_bridges(AehConfig) ->
|
|||
ConfigKafka = lists:keyreplace(bridge_type, 1, AehConfig, {bridge_type, ?KAFKA_BRIDGE_TYPE}),
|
||||
BridgeName = ?config(bridge_name, AehConfig),
|
||||
AehResourceId = emqx_bridge_testlib:resource_id(AehConfig),
|
||||
KafkaResourceId = emqx_bridge_testlib:resource_id(ConfigKafka),
|
||||
TracePoint = emqx_bridge_kafka_impl_producer_sync_query,
|
||||
%% creates the AEH bridge and check it's working
|
||||
ok = emqx_bridge_testlib:t_sync_query(
|
||||
|
@ -292,6 +293,9 @@ t_same_name_azure_kafka_bridges(AehConfig) ->
|
|||
),
|
||||
%% than creates a Kafka bridge with same name and delete it after creation
|
||||
ok = emqx_bridge_testlib:t_create_via_http(ConfigKafka),
|
||||
%% check that both bridges are healthy
|
||||
?assertEqual({ok, connected}, emqx_resource_manager:health_check(AehResourceId)),
|
||||
?assertEqual({ok, connected}, emqx_resource_manager:health_check(KafkaResourceId)),
|
||||
?assertMatch(
|
||||
{{ok, _}, {ok, _}},
|
||||
?wait_async_action(
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
|
||||
-export([
|
||||
hosts/1,
|
||||
make_client_id/1,
|
||||
make_client_id/2,
|
||||
sasl/1,
|
||||
socket_opts/1
|
||||
]).
|
||||
|
@ -24,11 +24,10 @@ hosts(Hosts) when is_list(Hosts) ->
|
|||
kpro:parse_endpoints(Hosts).
|
||||
|
||||
%% Client ID is better to be unique to make it easier for Kafka side trouble shooting.
|
||||
make_client_id(InstanceId) ->
|
||||
InstanceIdBin0 = to_bin(InstanceId),
|
||||
% Removing the <<"bridge:">> from beginning for backward compatibility
|
||||
InstanceIdBin = binary:replace(InstanceIdBin0, <<"bridge:">>, <<>>),
|
||||
iolist_to_binary([InstanceIdBin, ":", atom_to_list(node())]).
|
||||
make_client_id(BridgeType0, BridgeName0) ->
|
||||
BridgeType = to_bin(BridgeType0),
|
||||
BridgeName = to_bin(BridgeName0),
|
||||
iolist_to_binary([BridgeType, ":", BridgeName, ":", atom_to_list(node())]).
|
||||
|
||||
sasl(none) ->
|
||||
undefined;
|
||||
|
|
|
@ -121,6 +121,8 @@ on_start(ResourceId, Config) ->
|
|||
#{
|
||||
authentication := Auth,
|
||||
bootstrap_hosts := BootstrapHosts0,
|
||||
bridge_type := BridgeType,
|
||||
bridge_name := BridgeName,
|
||||
hookpoint := _,
|
||||
kafka := #{
|
||||
max_batch_bytes := _,
|
||||
|
@ -134,7 +136,7 @@ on_start(ResourceId, Config) ->
|
|||
} = Config,
|
||||
BootstrapHosts = emqx_bridge_kafka_impl:hosts(BootstrapHosts0),
|
||||
%% Note: this is distinct per node.
|
||||
ClientID = make_client_id(ResourceId),
|
||||
ClientID = make_client_id(ResourceId, BridgeType, BridgeName),
|
||||
ClientOpts0 =
|
||||
case Auth of
|
||||
none -> [];
|
||||
|
@ -515,11 +517,11 @@ is_dry_run(ResourceId) ->
|
|||
string:equal(TestIdStart, ResourceId)
|
||||
end.
|
||||
|
||||
-spec make_client_id(resource_id()) -> atom().
|
||||
make_client_id(InstanceId) ->
|
||||
case is_dry_run(InstanceId) of
|
||||
-spec make_client_id(resource_id(), binary(), atom() | binary()) -> atom().
|
||||
make_client_id(ResourceId, BridgeType, BridgeName) ->
|
||||
case is_dry_run(ResourceId) of
|
||||
false ->
|
||||
ClientID0 = emqx_bridge_kafka_impl:make_client_id(InstanceId),
|
||||
ClientID0 = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName),
|
||||
binary_to_atom(ClientID0);
|
||||
true ->
|
||||
%% It is a dry run and we don't want to leak too many
|
||||
|
|
|
@ -29,10 +29,6 @@
|
|||
-define(kafka_client_id, kafka_client_id).
|
||||
-define(kafka_producers, kafka_producers).
|
||||
|
||||
%% TODO: rename this to `kafka_producer' after alias support is added
|
||||
%% to hocon; keeping this as just `kafka' for backwards compatibility.
|
||||
-define(BRIDGE_TYPE, kafka).
|
||||
|
||||
query_mode(#{kafka := #{query_mode := sync}}) ->
|
||||
simple_sync;
|
||||
query_mode(_) ->
|
||||
|
@ -46,6 +42,7 @@ on_start(InstId, Config) ->
|
|||
authentication := Auth,
|
||||
bootstrap_hosts := Hosts0,
|
||||
bridge_name := BridgeName,
|
||||
bridge_type := BridgeType,
|
||||
connect_timeout := ConnTimeout,
|
||||
kafka := KafkaConfig = #{
|
||||
message := MessageTemplate,
|
||||
|
@ -60,12 +57,11 @@ on_start(InstId, Config) ->
|
|||
KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)),
|
||||
KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])),
|
||||
KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none),
|
||||
BridgeType = ?BRIDGE_TYPE,
|
||||
ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
|
||||
ok = emqx_resource:allocate_resource(InstId, ?kafka_resource_id, ResourceId),
|
||||
_ = maybe_install_wolff_telemetry_handlers(ResourceId),
|
||||
Hosts = emqx_bridge_kafka_impl:hosts(Hosts0),
|
||||
ClientId = emqx_bridge_kafka_impl:make_client_id(InstId),
|
||||
ClientId = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName),
|
||||
ok = emqx_resource:allocate_resource(InstId, ?kafka_client_id, ClientId),
|
||||
ClientConfig = #{
|
||||
min_metadata_refresh_interval => MinMetaRefreshInterval,
|
||||
|
@ -107,7 +103,7 @@ on_start(InstId, Config) ->
|
|||
_ ->
|
||||
string:equal(TestIdStart, InstId)
|
||||
end,
|
||||
WolffProducerConfig = producers_config(BridgeName, ClientId, KafkaConfig, IsDryRun),
|
||||
WolffProducerConfig = producers_config(BridgeType, BridgeName, ClientId, KafkaConfig, IsDryRun),
|
||||
case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of
|
||||
{ok, Producers} ->
|
||||
ok = emqx_resource:allocate_resource(InstId, ?kafka_producers, Producers),
|
||||
|
@ -462,7 +458,7 @@ ssl(#{enable := true} = SSL) ->
|
|||
ssl(_) ->
|
||||
[].
|
||||
|
||||
producers_config(BridgeName, ClientId, Input, IsDryRun) ->
|
||||
producers_config(BridgeType, BridgeName, ClientId, Input, IsDryRun) ->
|
||||
#{
|
||||
max_batch_bytes := MaxBatchBytes,
|
||||
compression := Compression,
|
||||
|
@ -488,10 +484,9 @@ producers_config(BridgeName, ClientId, Input, IsDryRun) ->
|
|||
disk -> {false, replayq_dir(ClientId)};
|
||||
hybrid -> {true, replayq_dir(ClientId)}
|
||||
end,
|
||||
BridgeType = ?BRIDGE_TYPE,
|
||||
ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
|
||||
#{
|
||||
name => make_producer_name(BridgeName, IsDryRun),
|
||||
name => make_producer_name(BridgeType, BridgeName, IsDryRun),
|
||||
partitioner => partitioner(PartitionStrategy),
|
||||
partition_count_refresh_interval_seconds => PCntRefreshInterval,
|
||||
replayq_dir => ReplayqDir,
|
||||
|
@ -516,20 +511,15 @@ replayq_dir(ClientId) ->
|
|||
|
||||
%% Producer name must be an atom which will be used as a ETS table name for
|
||||
%% partition worker lookup.
|
||||
make_producer_name(BridgeName, IsDryRun) when is_atom(BridgeName) ->
|
||||
make_producer_name(atom_to_list(BridgeName), IsDryRun);
|
||||
make_producer_name(BridgeName, IsDryRun) ->
|
||||
make_producer_name(_BridgeType, _BridgeName, true = _IsDryRun) ->
|
||||
%% It is a dry run and we don't want to leak too many atoms
|
||||
%% so we use the default producer name instead of creating
|
||||
%% an unique name.
|
||||
probing_wolff_producers;
|
||||
make_producer_name(BridgeType, BridgeName, _IsDryRun) ->
|
||||
%% Woff needs an atom for ets table name registration. The assumption here is
|
||||
%% that bridges with new names are not often created.
|
||||
case IsDryRun of
|
||||
true ->
|
||||
%% It is a dry run and we don't want to leak too many atoms
|
||||
%% so we use the default producer name instead of creating
|
||||
%% an unique name.
|
||||
probing_wolff_producers;
|
||||
false ->
|
||||
binary_to_atom(iolist_to_binary(["kafka_producer_", BridgeName]))
|
||||
end.
|
||||
binary_to_atom(iolist_to_binary([BridgeType, "_", bin(BridgeName)])).
|
||||
|
||||
with_log_at_error(Fun, Log) ->
|
||||
try
|
||||
|
|
|
@ -66,7 +66,7 @@ only_once_tests() ->
|
|||
].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
Config.
|
||||
[{bridge_type, <<"kafka_consumer">>} | Config].
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_mgmt_api_test_util:end_suite(),
|
||||
|
@ -898,8 +898,9 @@ ensure_connected(Config) ->
|
|||
ok.
|
||||
|
||||
consumer_clientid(Config) ->
|
||||
ResourceId = resource_id(Config),
|
||||
binary_to_atom(emqx_bridge_kafka_impl:make_client_id(ResourceId)).
|
||||
BridgeType = ?config(bridge_type, Config),
|
||||
KafkaName = ?config(kafka_name, Config),
|
||||
binary_to_atom(emqx_bridge_kafka_impl:make_client_id(BridgeType, KafkaName)).
|
||||
|
||||
get_client_connection(Config) ->
|
||||
KafkaHost = ?config(kafka_host, Config),
|
||||
|
|
|
@ -40,6 +40,7 @@
|
|||
%% TODO: rename this to `kafka_producer' after alias support is added
|
||||
%% to hocon; keeping this as just `kafka' for backwards compatibility.
|
||||
-define(BRIDGE_TYPE, "kafka").
|
||||
-define(BRIDGE_TYPE_BIN, <<"kafka">>).
|
||||
|
||||
-define(APPS, [emqx_resource, emqx_bridge, emqx_rule_engine, emqx_bridge_kafka]).
|
||||
|
||||
|
@ -438,7 +439,7 @@ t_failed_creation_then_fix(Config) ->
|
|||
{ok, #{config := WrongConfigAtom1}} = emqx_bridge:create(
|
||||
Type, erlang:list_to_atom(Name), WrongConf
|
||||
),
|
||||
WrongConfigAtom = WrongConfigAtom1#{bridge_name => Name},
|
||||
WrongConfigAtom = WrongConfigAtom1#{bridge_name => Name, bridge_type => ?BRIDGE_TYPE_BIN},
|
||||
?assertThrow(Reason when is_list(Reason), ?PRODUCER:on_start(ResourceId, WrongConfigAtom)),
|
||||
%% before throwing, it should cleanup the client process. we
|
||||
%% retry because the supervisor might need some time to really
|
||||
|
@ -448,7 +449,7 @@ t_failed_creation_then_fix(Config) ->
|
|||
{ok, #{config := ValidConfigAtom1}} = emqx_bridge:create(
|
||||
Type, erlang:list_to_atom(Name), ValidConf
|
||||
),
|
||||
ValidConfigAtom = ValidConfigAtom1#{bridge_name => Name},
|
||||
ValidConfigAtom = ValidConfigAtom1#{bridge_name => Name, bridge_type => ?BRIDGE_TYPE_BIN},
|
||||
{ok, State} = ?PRODUCER:on_start(ResourceId, ValidConfigAtom),
|
||||
Time = erlang:unique_integer(),
|
||||
BinTime = integer_to_binary(Time),
|
||||
|
@ -540,7 +541,7 @@ t_nonexistent_topic(_Config) ->
|
|||
{ok, #{config := ValidConfigAtom1}} = emqx_bridge:create(
|
||||
Type, erlang:list_to_atom(Name), Conf
|
||||
),
|
||||
ValidConfigAtom = ValidConfigAtom1#{bridge_name => Name},
|
||||
ValidConfigAtom = ValidConfigAtom1#{bridge_name => Name, bridge_type => ?BRIDGE_TYPE_BIN},
|
||||
?assertThrow(_, ?PRODUCER:on_start(ResourceId, ValidConfigAtom)),
|
||||
ok = emqx_bridge_resource:remove(BridgeId),
|
||||
delete_all_bridges(),
|
||||
|
@ -585,7 +586,7 @@ t_send_message_with_headers(Config) ->
|
|||
{ok, #{config := ConfigAtom1}} = emqx_bridge:create(
|
||||
Type, erlang:list_to_atom(Name), Conf
|
||||
),
|
||||
ConfigAtom = ConfigAtom1#{bridge_name => Name},
|
||||
ConfigAtom = ConfigAtom1#{bridge_name => Name, bridge_type => ?BRIDGE_TYPE_BIN},
|
||||
{ok, State} = ?PRODUCER:on_start(ResourceId, ConfigAtom),
|
||||
Time1 = erlang:unique_integer(),
|
||||
BinTime1 = integer_to_binary(Time1),
|
||||
|
@ -807,7 +808,7 @@ t_wrong_headers_from_message(Config) ->
|
|||
{ok, #{config := ConfigAtom1}} = emqx_bridge:create(
|
||||
Type, erlang:list_to_atom(Name), Conf
|
||||
),
|
||||
ConfigAtom = ConfigAtom1#{bridge_name => Name},
|
||||
ConfigAtom = ConfigAtom1#{bridge_name => Name, bridge_type => ?BRIDGE_TYPE_BIN},
|
||||
{ok, State} = ?PRODUCER:on_start(ResourceId, ConfigAtom),
|
||||
Time1 = erlang:unique_integer(),
|
||||
Payload1 = <<"wrong_header">>,
|
||||
|
|
Loading…
Reference in New Issue