diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 5bc83dbd9..d5d5adff1 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -55,7 +55,10 @@ T == gcp_pubsub; T == influxdb_api_v1; T == influxdb_api_v2; - T == kafka_producer; + %% TODO: rename this to `kafka_producer' after alias support is + %% added to hocon; keeping this as just `kafka' for backwards + %% compatibility. + T == kafka; T == redis_single; T == redis_sentinel; T == redis_cluster; diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index fde823ea7..6426a46b7 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -309,7 +309,9 @@ parse_confs(Type, Name, Conf) when ?IS_INGRESS_BRIDGE(Type) -> %% receives a message from the external database. BId = bridge_id(Type, Name), Conf#{hookpoint => <<"$bridges/", BId/binary>>, bridge_name => Name}; -parse_confs(<<"kafka_producer">> = _Type, Name, Conf) -> +%% TODO: rename this to `kafka_producer' after alias support is added +%% to hocon; keeping this as just `kafka' for backwards compatibility. +parse_confs(<<"kafka">> = _Type, Name, Conf) -> Conf#{bridge_name => Name}; parse_confs(_Type, _Name, Conf) -> Conf. diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf index 6d2ca46ab..636573d07 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf @@ -51,8 +51,8 @@ emqx_ee_bridge_kafka { } producer_opts { desc { - en: "Local MQTT data source and Kafka bridge configs. Should not configure this if the bridge is used as a rule action." - zh: "本地 MQTT 数据源和 Kafka 桥接的配置。若该桥接用于规则的动作,则必须将该配置项删除。" + en: "Local MQTT data source and Kafka bridge configs." + zh: "本地 MQTT 数据源和 Kafka 桥接的配置。" } label { en: "MQTT to Kafka" @@ -61,8 +61,8 @@ emqx_ee_bridge_kafka { } mqtt_topic { desc { - en: "MQTT topic or topic as data source (bridge input)." - zh: "指定 MQTT 主题作为桥接的数据源" + en: "MQTT topic or topic as data source (bridge input). Should not configure this if the bridge is used as a rule action." + zh: "指定 MQTT 主题作为桥接的数据源。 若该桥接用于规则的动作,则必须将该配置项删除。" } label { en: "Source MQTT Topic" diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index 0746736f3..ec81b7935 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -66,7 +66,9 @@ examples(Method) -> resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8)); resource_type(kafka_consumer) -> emqx_bridge_impl_kafka_consumer; -resource_type(kafka_producer) -> emqx_bridge_impl_kafka_producer; +%% TODO: rename this to `kafka_producer' after alias support is added +%% to hocon; keeping this as just `kafka' for backwards compatibility. +resource_type(kafka) -> emqx_bridge_impl_kafka_producer; resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb; resource_type(gcp_pubsub) -> emqx_ee_connector_gcp_pubsub; resource_type(mongodb_rs) -> emqx_ee_connector_mongodb; @@ -145,12 +147,23 @@ mongodb_structs() -> kafka_structs() -> [ - {Type, + %% TODO: rename this to `kafka_producer' after alias support + %% is added to hocon; keeping this as just `kafka' for + %% backwards compatibility. + {kafka, mk( - hoconsc:map(name, ref(emqx_ee_bridge_kafka, Type)), - #{desc => <<"Kafka ", Name/binary, " Bridge Config">>, required => false} + hoconsc:map(name, ref(emqx_ee_bridge_kafka, kafka_producer)), + #{ + desc => <<"Kafka Producer Bridge Config">>, + required => false, + converter => fun emqx_ee_bridge_kafka:kafka_producer_converter/2 + } + )}, + {kafka_consumer, + mk( + hoconsc:map(name, ref(emqx_ee_bridge_kafka, kafka_consumer)), + #{desc => <<"Kafka Consumer Bridge Config">>, required => false} )} - || {Type, Name} <- [{kafka_producer, <<"Producer">>}, {kafka_consumer, <<"Consumer">>}] ]. influxdb_structs() -> diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl index d8f7f7fc8..583acc48d 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl @@ -30,13 +30,18 @@ host_opts/0 ]). +-export([kafka_producer_converter/2]). + %% ------------------------------------------------------------------------------------------------- %% api conn_bridge_examples(Method) -> [ #{ - <<"kafka_producer">> => #{ + %% TODO: rename this to `kafka_producer' after alias + %% support is added to hocon; keeping this as just `kafka' + %% for backwards compatibility. + <<"kafka">> => #{ summary => <<"Kafka Producer Bridge">>, value => values(Method) } @@ -171,7 +176,7 @@ fields(producer_opts) -> %% Note: there's an implicit convention in `emqx_bridge' that, %% for egress bridges with this config, the published messages %% will be forwarded to such bridges. - {local_topic, mk(binary(), #{desc => ?DESC(mqtt_topic)})}, + {local_topic, mk(binary(), #{required => false, desc => ?DESC(mqtt_topic)})}, {kafka, mk(ref(producer_kafka_opts), #{ required => true, @@ -332,10 +337,33 @@ struct_names() -> %% internal type_field() -> {type, - mk(enum([kafka_consumer, kafka_producer]), #{required => true, desc => ?DESC("desc_type")})}. + %% TODO: rename `kafka' to `kafka_producer' after alias + %% support is added to hocon; keeping this as just `kafka' for + %% backwards compatibility. + mk(enum([kafka_consumer, kafka]), #{required => true, desc => ?DESC("desc_type")})}. name_field() -> {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. ref(Name) -> hoconsc:ref(?MODULE, Name). + +kafka_producer_converter(undefined, _HoconOpts) -> + undefined; +kafka_producer_converter( + #{<<"producer">> := OldOpts0, <<"bootstrap_hosts">> := _} = Config0, _HoconOpts +) -> + %% old schema + MQTTOpts = maps:get(<<"mqtt">>, OldOpts0, #{}), + LocalTopic = maps:get(<<"topic">>, MQTTOpts, undefined), + KafkaOpts = maps:get(<<"kafka">>, OldOpts0), + Config = maps:without([<<"producer">>], Config0), + case LocalTopic =:= undefined of + true -> + Config#{<<"kafka">> => KafkaOpts}; + false -> + Config#{<<"kafka">> => KafkaOpts, <<"local_topic">> => LocalTopic} + end; +kafka_producer_converter(Config, _HoconOpts) -> + %% new schema + Config. diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index 785825a24..d46f687dd 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -22,6 +22,10 @@ -include_lib("emqx/include/logger.hrl"). +%% TODO: rename this to `kafka_producer' after alias support is added +%% to hocon; keeping this as just `kafka' for backwards compatibility. +-define(BRIDGE_TYPE, kafka). + callback_mode() -> async_if_possible. %% @doc Config schema is defined in emqx_ee_bridge_kafka. @@ -37,12 +41,11 @@ on_start(InstId, Config) -> socket_opts := SocketOpts, ssl := SSL } = Config, - BridgeType = kafka_consumer, + BridgeType = ?BRIDGE_TYPE, ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), _ = maybe_install_wolff_telemetry_handlers(ResourceId), Hosts = emqx_bridge_impl_kafka:hosts(Hosts0), - KafkaType = kafka_producer, - ClientId = emqx_bridge_impl_kafka:make_client_id(KafkaType, BridgeName), + ClientId = emqx_bridge_impl_kafka:make_client_id(BridgeType, BridgeName), ClientConfig = #{ min_metadata_refresh_interval => MinMetaRefreshInterval, connect_timeout => ConnTimeout, @@ -315,7 +318,7 @@ producers_config(BridgeName, ClientId, Input, IsDryRun) -> disk -> {false, replayq_dir(ClientId)}; hybrid -> {true, replayq_dir(ClientId)} end, - BridgeType = kafka_producer, + BridgeType = ?BRIDGE_TYPE, ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName), #{ name => make_producer_name(BridgeName, IsDryRun), diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index 7a214de08..4b9642442 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -30,16 +30,15 @@ -include_lib("emqx/include/emqx.hrl"). -include("emqx_dashboard.hrl"). --define(CONTENT_TYPE, "application/x-www-form-urlencoded"). - -define(HOST, "http://127.0.0.1:18083"). %% -define(API_VERSION, "v5"). -define(BASE_PATH, "/api/v5"). --define(APP_DASHBOARD, emqx_dashboard). --define(APP_MANAGEMENT, emqx_management). +%% TODO: rename this to `kafka_producer' after alias support is added +%% to hocon; keeping this as just `kafka' for backwards compatibility. +-define(BRIDGE_TYPE, "kafka"). %%------------------------------------------------------------------------------ %% CT boilerplate @@ -233,7 +232,7 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) -> ok. kafka_bridge_rest_api_helper(Config) -> - BridgeType = "kafka_producer", + BridgeType = ?BRIDGE_TYPE, BridgeName = "my_kafka_bridge", BridgeID = emqx_bridge_resource:bridge_id( erlang:list_to_binary(BridgeType), @@ -244,6 +243,7 @@ kafka_bridge_rest_api_helper(Config) -> erlang:list_to_binary(BridgeName) ), UrlEscColon = "%3A", + BridgesProbeParts = ["bridges_probe"], BridgeIdUrlEnc = BridgeType ++ UrlEscColon ++ BridgeName, BridgesParts = ["bridges"], BridgesPartsIdDeleteAlsoActions = ["bridges", BridgeIdUrlEnc ++ "?also_delete_dep_actions"], @@ -277,7 +277,7 @@ kafka_bridge_rest_api_helper(Config) -> %% Create new Kafka bridge KafkaTopic = "test-topic-one-partition", CreateBodyTmp = #{ - <<"type">> => <<"kafka_producer">>, + <<"type">> => <>, <<"name">> => <<"my_kafka_bridge">>, <<"bootstrap_hosts">> => iolist_to_binary(maps:get(<<"bootstrap_hosts">>, Config)), <<"enable">> => true, @@ -300,6 +300,13 @@ kafka_bridge_rest_api_helper(Config) -> {ok, 201, _Data} = show(http_post(BridgesParts, show(CreateBody))), %% Check that the new bridge is in the list of bridges true = MyKafkaBridgeExists(), + %% Probe should work + {ok, 204, _} = http_post(BridgesProbeParts, CreateBody), + %% no extra atoms should be created when probing + AtomsBefore = erlang:system_info(atom_count), + {ok, 204, _} = http_post(BridgesProbeParts, CreateBody), + AtomsAfter = erlang:system_info(atom_count), + ?assertEqual(AtomsBefore, AtomsAfter), %% Create a rule that uses the bridge {ok, 201, _Rule} = http_post( ["rules"], @@ -377,10 +384,10 @@ t_failed_creation_then_fix(Config) -> ValidAuthSettings = valid_sasl_plain_settings(), WrongAuthSettings = ValidAuthSettings#{"password" := "wrong"}, Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]), - Type = kafka_producer, + Type = ?BRIDGE_TYPE, Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), - ResourceId = emqx_bridge_resource:resource_id("kafka_producer", Name), - BridgeId = emqx_bridge_resource:bridge_id("kafka_producer", Name), + ResourceId = emqx_bridge_resource:resource_id(Type, Name), + BridgeId = emqx_bridge_resource:bridge_id(Type, Name), KafkaTopic = "test-topic-one-partition", WrongConf = config(#{ "authentication" => WrongAuthSettings, @@ -511,7 +518,7 @@ publish_helper( end, Hash = erlang:phash2([HostsString, AuthSettings, SSLSettings]), Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), - Type = "kafka_producer", + Type = ?BRIDGE_TYPE, InstId = emqx_bridge_resource:resource_id(Type, Name), KafkaTopic = "test-topic-one-partition", Conf = config( @@ -526,7 +533,7 @@ publish_helper( Conf0 ), {ok, _} = emqx_bridge:create( - <<"kafka_producer">>, list_to_binary(Name), Conf + <>, list_to_binary(Name), Conf ), Time = erlang:unique_integer(), BinTime = integer_to_binary(Time), @@ -600,8 +607,11 @@ hocon_config(Args) -> %% erlfmt-ignore hocon_config_template() -> +%% TODO: rename the type to `kafka_producer' after alias support is +%% added to hocon; keeping this as just `kafka' for backwards +%% compatibility. """ -bridges.kafka_producer.{{ bridge_name }} { +bridges.kafka.{{ bridge_name }} { bootstrap_hosts = \"{{ kafka_hosts_string }}\" enable = true authentication = {{{ authentication }}} diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_kafka_tests.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_kafka_tests.erl new file mode 100644 index 000000000..47c21b673 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_kafka_tests.erl @@ -0,0 +1,181 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ee_bridge_kafka_tests). + +-include_lib("eunit/include/eunit.hrl"). + +%%=========================================================================== +%% Test cases +%%=========================================================================== + +kafka_producer_test() -> + Conf1 = parse(kafka_producer_old_hocon(_WithLocalTopic0 = false)), + Conf2 = parse(kafka_producer_old_hocon(_WithLocalTopic1 = true)), + Conf3 = parse(kafka_producer_new_hocon()), + + ?assertMatch( + #{ + <<"bridges">> := + #{ + <<"kafka">> := + #{ + <<"myproducer">> := + #{<<"kafka">> := #{}} + } + } + }, + check(Conf1) + ), + ?assertNotMatch( + #{ + <<"bridges">> := + #{ + <<"kafka">> := + #{ + <<"myproducer">> := + #{<<"local_topic">> := _} + } + } + }, + check(Conf1) + ), + ?assertMatch( + #{ + <<"bridges">> := + #{ + <<"kafka">> := + #{ + <<"myproducer">> := + #{ + <<"kafka">> := #{}, + <<"local_topic">> := <<"mqtt/local">> + } + } + } + }, + check(Conf2) + ), + ?assertMatch( + #{ + <<"bridges">> := + #{ + <<"kafka">> := + #{ + <<"myproducer">> := + #{ + <<"kafka">> := #{}, + <<"local_topic">> := <<"mqtt/local">> + } + } + } + }, + check(Conf3) + ), + + ok. + +%%=========================================================================== +%% Helper functions +%%=========================================================================== + +parse(Hocon) -> + {ok, Conf} = hocon:binary(Hocon), + Conf. + +check(Conf) when is_map(Conf) -> + hocon_tconf:check_plain(emqx_bridge_schema, Conf). + +%%=========================================================================== +%% Data section +%%=========================================================================== + +%% erlfmt-ignore +kafka_producer_old_hocon(_WithLocalTopic = true) -> + kafka_producer_old_hocon("mqtt {topic = \"mqtt/local\"}\n"); +kafka_producer_old_hocon(_WithLocalTopic = false) -> + kafka_producer_old_hocon("mqtt {}\n"); +kafka_producer_old_hocon(MQTTConfig) when is_list(MQTTConfig) -> +""" +bridges.kafka { + myproducer { + authentication = \"none\" + bootstrap_hosts = \"toxiproxy:9292\" + connect_timeout = \"5s\" + metadata_request_timeout = \"5s\" + min_metadata_refresh_interval = \"3s\" + producer { + kafka { + buffer { + memory_overload_protection = false + mode = \"memory\" + per_partition_limit = \"2GB\" + segment_bytes = \"100MB\" + } + compression = \"no_compression\" + max_batch_bytes = \"896KB\" + max_inflight = 10 + message { + key = \"${.clientid}\" + timestamp = \"${.timestamp}\" + value = \"${.}\" + } + partition_count_refresh_interval = \"60s\" + partition_strategy = \"random\" + required_acks = \"all_isr\" + topic = \"test-topic-two-partitions\" + } +""" ++ MQTTConfig ++ +""" + } + socket_opts { + nodelay = true + recbuf = \"1024KB\" + sndbuf = \"1024KB\" + } + ssl {enable = false, verify = \"verify_peer\"} + } +} +""". + +kafka_producer_new_hocon() -> + "" + "\n" + "bridges.kafka {\n" + " myproducer {\n" + " authentication = \"none\"\n" + " bootstrap_hosts = \"toxiproxy:9292\"\n" + " connect_timeout = \"5s\"\n" + " metadata_request_timeout = \"5s\"\n" + " min_metadata_refresh_interval = \"3s\"\n" + " kafka {\n" + " buffer {\n" + " memory_overload_protection = false\n" + " mode = \"memory\"\n" + " per_partition_limit = \"2GB\"\n" + " segment_bytes = \"100MB\"\n" + " }\n" + " compression = \"no_compression\"\n" + " max_batch_bytes = \"896KB\"\n" + " max_inflight = 10\n" + " message {\n" + " key = \"${.clientid}\"\n" + " timestamp = \"${.timestamp}\"\n" + " value = \"${.}\"\n" + " }\n" + " partition_count_refresh_interval = \"60s\"\n" + " partition_strategy = \"random\"\n" + " required_acks = \"all_isr\"\n" + " topic = \"test-topic-two-partitions\"\n" + " }\n" + " local_topic = \"mqtt/local\"\n" + " socket_opts {\n" + " nodelay = true\n" + " recbuf = \"1024KB\"\n" + " sndbuf = \"1024KB\"\n" + " }\n" + " ssl {enable = false, verify = \"verify_peer\"}\n" + " }\n" + "}\n" + "".