diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index a48d0294e..65e05b950 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -175,14 +175,15 @@ create(BridgeId, Conf) -> create(Type, Name, Conf) -> create(Type, Name, Conf, #{}). -create(Type, Name, Conf, Opts) -> +create(Type, Name, Conf0, Opts) -> ?SLOG(info, #{ msg => "create bridge", type => Type, name => Name, - config => emqx_utils:redact(Conf) + config => emqx_utils:redact(Conf0) }), TypeBin = bin(Type), + Conf = Conf0#{bridge_type => TypeBin, bridge_name => Name}, {ok, _Data} = emqx_resource:create_local( resource_id(Type, Name), <<"emqx_bridge">>, @@ -249,8 +250,9 @@ recreate(Type, Name) -> recreate(Type, Name, Conf) -> recreate(Type, Name, Conf, #{}). -recreate(Type, Name, Conf, Opts) -> +recreate(Type, Name, Conf0, Opts) -> TypeBin = bin(Type), + Conf = Conf0#{bridge_type => TypeBin, bridge_name => Name}, emqx_resource:recreate_local( resource_id(Type, Name), bridge_to_resource_type(Type), @@ -267,17 +269,18 @@ create_dry_run(Type, Conf0) -> Conf1 = maps:without([<<"name">>], Conf0), RawConf = #{<<"bridges">> => #{TypeBin => #{<<"temp_name">> => Conf1}}}, try - #{bridges := #{TypeAtom := #{temp_name := Conf}}} = + #{bridges := #{TypeAtom := #{temp_name := Conf2}}} = hocon_tconf:check_plain( emqx_bridge_schema, RawConf, #{atom_key => true, required => false} ), + Conf = Conf2#{bridge_type => TypeBin, bridge_name => TmpName}, case emqx_connector_ssl:convert_certs(TmpPath, Conf) of {error, Reason} -> {error, Reason}; {ok, ConfNew} -> - ParseConf = parse_confs(bin(Type), TmpName, ConfNew), + ParseConf = parse_confs(TypeBin, TmpName, ConfNew), emqx_resource:create_dry_run_local(bridge_to_resource_type(Type), ParseConf) end catch @@ -387,15 +390,7 @@ parse_confs(Type, Name, Conf) when ?IS_INGRESS_BRIDGE(Type) -> %% receives a message from the external database. BId = bridge_id(Type, Name), BridgeHookpoint = bridge_hookpoint(BId), - Conf#{hookpoint => BridgeHookpoint, bridge_name => Name}; -%% TODO: rename this to `kafka_producer' after alias support is added -%% to hocon; keeping this as just `kafka' for backwards compatibility. -parse_confs(<<"kafka">> = _Type, Name, Conf) -> - Conf#{bridge_name => Name}; -parse_confs(<<"pulsar_producer">> = _Type, Name, Conf) -> - Conf#{bridge_name => Name}; -parse_confs(<<"kinesis_producer">> = _Type, Name, Conf) -> - Conf#{bridge_name => Name}; + Conf#{hookpoint => BridgeHookpoint}; parse_confs(BridgeType, BridgeName, Config) -> connector_config(BridgeType, BridgeName, Config). diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl index c563a35d8..b34d6132a 100644 --- a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl +++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl @@ -166,14 +166,14 @@ values(producer) -> %% `emqx_bridge_resource' API %%------------------------------------------------------------------------------------------------- -connector_config(Config, BridgeName) -> +connector_config(Config, _BridgeName) -> %% Default port for AEH is 9093 BootstrapHosts0 = maps:get(bootstrap_hosts, Config), BootstrapHosts = emqx_schema:parse_servers( BootstrapHosts0, emqx_bridge_azure_event_hub:host_opts() ), - Config#{bridge_name => BridgeName, bootstrap_hosts := BootstrapHosts}. + Config#{bootstrap_hosts := BootstrapHosts}. %%------------------------------------------------------------------------------------------------- %% Internal fns diff --git a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_producer_SUITE.erl b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_producer_SUITE.erl index 91016295f..87c2127c2 100644 --- a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_producer_SUITE.erl +++ b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_producer_SUITE.erl @@ -282,6 +282,7 @@ t_same_name_azure_kafka_bridges(AehConfig) -> ConfigKafka = lists:keyreplace(bridge_type, 1, AehConfig, {bridge_type, ?KAFKA_BRIDGE_TYPE}), BridgeName = ?config(bridge_name, AehConfig), AehResourceId = emqx_bridge_testlib:resource_id(AehConfig), + KafkaResourceId = emqx_bridge_testlib:resource_id(ConfigKafka), TracePoint = emqx_bridge_kafka_impl_producer_sync_query, %% creates the AEH bridge and check it's working ok = emqx_bridge_testlib:t_sync_query( @@ -292,6 +293,9 @@ t_same_name_azure_kafka_bridges(AehConfig) -> ), %% than creates a Kafka bridge with same name and delete it after creation ok = emqx_bridge_testlib:t_create_via_http(ConfigKafka), + %% check that both bridges are healthy + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(AehResourceId)), + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(KafkaResourceId)), ?assertMatch( {{ok, _}, {ok, _}}, ?wait_async_action( diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl.erl index 64ac8cf99..7532ba963 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl.erl @@ -7,7 +7,7 @@ -export([ hosts/1, - make_client_id/1, + make_client_id/2, sasl/1, socket_opts/1 ]). @@ -24,11 +24,10 @@ hosts(Hosts) when is_list(Hosts) -> kpro:parse_endpoints(Hosts). %% Client ID is better to be unique to make it easier for Kafka side trouble shooting. -make_client_id(InstanceId) -> - InstanceIdBin0 = to_bin(InstanceId), - % Removing the <<"bridge:">> from beginning for backward compatibility - InstanceIdBin = binary:replace(InstanceIdBin0, <<"bridge:">>, <<>>), - iolist_to_binary([InstanceIdBin, ":", atom_to_list(node())]). +make_client_id(BridgeType0, BridgeName0) -> + BridgeType = to_bin(BridgeType0), + BridgeName = to_bin(BridgeName0), + iolist_to_binary([BridgeType, ":", BridgeName, ":", atom_to_list(node())]). sasl(none) -> undefined; diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index 06176ac93..b16f163fb 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -121,6 +121,8 @@ on_start(ResourceId, Config) -> #{ authentication := Auth, bootstrap_hosts := BootstrapHosts0, + bridge_type := BridgeType, + bridge_name := BridgeName, hookpoint := _, kafka := #{ max_batch_bytes := _, @@ -134,7 +136,7 @@ on_start(ResourceId, Config) -> } = Config, BootstrapHosts = emqx_bridge_kafka_impl:hosts(BootstrapHosts0), %% Note: this is distinct per node. - ClientID = make_client_id(ResourceId), + ClientID = make_client_id(ResourceId, BridgeType, BridgeName), ClientOpts0 = case Auth of none -> []; @@ -515,11 +517,11 @@ is_dry_run(ResourceId) -> string:equal(TestIdStart, ResourceId) end. --spec make_client_id(resource_id()) -> atom(). -make_client_id(InstanceId) -> - case is_dry_run(InstanceId) of +-spec make_client_id(resource_id(), binary(), atom() | binary()) -> atom(). +make_client_id(ResourceId, BridgeType, BridgeName) -> + case is_dry_run(ResourceId) of false -> - ClientID0 = emqx_bridge_kafka_impl:make_client_id(InstanceId), + ClientID0 = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName), binary_to_atom(ClientID0); true -> %% It is a dry run and we don't want to leak too many diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 0b8f526f6..3485ac752 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -29,10 +29,6 @@ -define(kafka_client_id, kafka_client_id). -define(kafka_producers, kafka_producers). -%% TODO: rename this to `kafka_producer' after alias support is added -%% to hocon; keeping this as just `kafka' for backwards compatibility. --define(BRIDGE_TYPE, kafka). - query_mode(#{kafka := #{query_mode := sync}}) -> simple_sync; query_mode(_) -> @@ -46,6 +42,7 @@ on_start(InstId, Config) -> authentication := Auth, bootstrap_hosts := Hosts0, bridge_name := BridgeName, + bridge_type := BridgeType, connect_timeout := ConnTimeout, kafka := KafkaConfig = #{ message := MessageTemplate, @@ -60,12 +57,11 @@ on_start(InstId, Config) -> KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)), KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])), KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none), - BridgeType = ?BRIDGE_TYPE, ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), ok = emqx_resource:allocate_resource(InstId, ?kafka_resource_id, ResourceId), _ = maybe_install_wolff_telemetry_handlers(ResourceId), Hosts = emqx_bridge_kafka_impl:hosts(Hosts0), - ClientId = emqx_bridge_kafka_impl:make_client_id(InstId), + ClientId = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName), ok = emqx_resource:allocate_resource(InstId, ?kafka_client_id, ClientId), ClientConfig = #{ min_metadata_refresh_interval => MinMetaRefreshInterval, @@ -107,7 +103,7 @@ on_start(InstId, Config) -> _ -> string:equal(TestIdStart, InstId) end, - WolffProducerConfig = producers_config(BridgeName, ClientId, KafkaConfig, IsDryRun), + WolffProducerConfig = producers_config(BridgeType, BridgeName, ClientId, KafkaConfig, IsDryRun), case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of {ok, Producers} -> ok = emqx_resource:allocate_resource(InstId, ?kafka_producers, Producers), @@ -462,7 +458,7 @@ ssl(#{enable := true} = SSL) -> ssl(_) -> []. -producers_config(BridgeName, ClientId, Input, IsDryRun) -> +producers_config(BridgeType, BridgeName, ClientId, Input, IsDryRun) -> #{ max_batch_bytes := MaxBatchBytes, compression := Compression, @@ -488,10 +484,9 @@ producers_config(BridgeName, ClientId, Input, IsDryRun) -> disk -> {false, replayq_dir(ClientId)}; hybrid -> {true, replayq_dir(ClientId)} end, - BridgeType = ?BRIDGE_TYPE, ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName), #{ - name => make_producer_name(BridgeName, IsDryRun), + name => make_producer_name(BridgeType, BridgeName, IsDryRun), partitioner => partitioner(PartitionStrategy), partition_count_refresh_interval_seconds => PCntRefreshInterval, replayq_dir => ReplayqDir, @@ -516,20 +511,15 @@ replayq_dir(ClientId) -> %% Producer name must be an atom which will be used as a ETS table name for %% partition worker lookup. -make_producer_name(BridgeName, IsDryRun) when is_atom(BridgeName) -> - make_producer_name(atom_to_list(BridgeName), IsDryRun); -make_producer_name(BridgeName, IsDryRun) -> +make_producer_name(_BridgeType, _BridgeName, true = _IsDryRun) -> + %% It is a dry run and we don't want to leak too many atoms + %% so we use the default producer name instead of creating + %% an unique name. + probing_wolff_producers; +make_producer_name(BridgeType, BridgeName, _IsDryRun) -> %% Woff needs an atom for ets table name registration. The assumption here is %% that bridges with new names are not often created. - case IsDryRun of - true -> - %% It is a dry run and we don't want to leak too many atoms - %% so we use the default producer name instead of creating - %% an unique name. - probing_wolff_producers; - false -> - binary_to_atom(iolist_to_binary(["kafka_producer_", BridgeName])) - end. + binary_to_atom(iolist_to_binary([BridgeType, "_", bin(BridgeName)])). with_log_at_error(Fun, Log) -> try diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 756fa8b3b..2f5840f41 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -66,7 +66,7 @@ only_once_tests() -> ]. init_per_suite(Config) -> - Config. + [{bridge_type, <<"kafka_consumer">>} | Config]. end_per_suite(_Config) -> emqx_mgmt_api_test_util:end_suite(), @@ -898,8 +898,9 @@ ensure_connected(Config) -> ok. consumer_clientid(Config) -> - ResourceId = resource_id(Config), - binary_to_atom(emqx_bridge_kafka_impl:make_client_id(ResourceId)). + BridgeType = ?config(bridge_type, Config), + KafkaName = ?config(kafka_name, Config), + binary_to_atom(emqx_bridge_kafka_impl:make_client_id(BridgeType, KafkaName)). get_client_connection(Config) -> KafkaHost = ?config(kafka_host, Config), diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index ba11ddf14..432ce8697 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -40,6 +40,7 @@ %% TODO: rename this to `kafka_producer' after alias support is added %% to hocon; keeping this as just `kafka' for backwards compatibility. -define(BRIDGE_TYPE, "kafka"). +-define(BRIDGE_TYPE_BIN, <<"kafka">>). -define(APPS, [emqx_resource, emqx_bridge, emqx_rule_engine, emqx_bridge_kafka]). @@ -438,7 +439,7 @@ t_failed_creation_then_fix(Config) -> {ok, #{config := WrongConfigAtom1}} = emqx_bridge:create( Type, erlang:list_to_atom(Name), WrongConf ), - WrongConfigAtom = WrongConfigAtom1#{bridge_name => Name}, + WrongConfigAtom = WrongConfigAtom1#{bridge_name => Name, bridge_type => ?BRIDGE_TYPE_BIN}, ?assertThrow(Reason when is_list(Reason), ?PRODUCER:on_start(ResourceId, WrongConfigAtom)), %% before throwing, it should cleanup the client process. we %% retry because the supervisor might need some time to really @@ -448,7 +449,7 @@ t_failed_creation_then_fix(Config) -> {ok, #{config := ValidConfigAtom1}} = emqx_bridge:create( Type, erlang:list_to_atom(Name), ValidConf ), - ValidConfigAtom = ValidConfigAtom1#{bridge_name => Name}, + ValidConfigAtom = ValidConfigAtom1#{bridge_name => Name, bridge_type => ?BRIDGE_TYPE_BIN}, {ok, State} = ?PRODUCER:on_start(ResourceId, ValidConfigAtom), Time = erlang:unique_integer(), BinTime = integer_to_binary(Time), @@ -540,7 +541,7 @@ t_nonexistent_topic(_Config) -> {ok, #{config := ValidConfigAtom1}} = emqx_bridge:create( Type, erlang:list_to_atom(Name), Conf ), - ValidConfigAtom = ValidConfigAtom1#{bridge_name => Name}, + ValidConfigAtom = ValidConfigAtom1#{bridge_name => Name, bridge_type => ?BRIDGE_TYPE_BIN}, ?assertThrow(_, ?PRODUCER:on_start(ResourceId, ValidConfigAtom)), ok = emqx_bridge_resource:remove(BridgeId), delete_all_bridges(), @@ -585,7 +586,7 @@ t_send_message_with_headers(Config) -> {ok, #{config := ConfigAtom1}} = emqx_bridge:create( Type, erlang:list_to_atom(Name), Conf ), - ConfigAtom = ConfigAtom1#{bridge_name => Name}, + ConfigAtom = ConfigAtom1#{bridge_name => Name, bridge_type => ?BRIDGE_TYPE_BIN}, {ok, State} = ?PRODUCER:on_start(ResourceId, ConfigAtom), Time1 = erlang:unique_integer(), BinTime1 = integer_to_binary(Time1), @@ -807,7 +808,7 @@ t_wrong_headers_from_message(Config) -> {ok, #{config := ConfigAtom1}} = emqx_bridge:create( Type, erlang:list_to_atom(Name), Conf ), - ConfigAtom = ConfigAtom1#{bridge_name => Name}, + ConfigAtom = ConfigAtom1#{bridge_name => Name, bridge_type => ?BRIDGE_TYPE_BIN}, {ok, State} = ?PRODUCER:on_start(ResourceId, ConfigAtom), Time1 = erlang:unique_integer(), Payload1 = <<"wrong_header">>,