From 33eccb35dacde59a549d9310a2ed2e70cfa6b6f3 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 24 Jul 2024 15:49:58 -0300 Subject: [PATCH 1/5] chore: update wolff -> 3.0.2 --- apps/emqx_bridge_azure_event_hub/mix.exs | 2 +- apps/emqx_bridge_azure_event_hub/rebar.config | 2 +- apps/emqx_bridge_confluent/mix.exs | 2 +- apps/emqx_bridge_confluent/rebar.config | 2 +- apps/emqx_bridge_kafka/mix.exs | 2 +- apps/emqx_bridge_kafka/rebar.config | 2 +- .../test/emqx_bridge_kafka_impl_consumer_SUITE.erl | 6 +++--- mix.exs | 2 +- 8 files changed, 10 insertions(+), 10 deletions(-) diff --git a/apps/emqx_bridge_azure_event_hub/mix.exs b/apps/emqx_bridge_azure_event_hub/mix.exs index 42edddbbe..8f5068d0e 100644 --- a/apps/emqx_bridge_azure_event_hub/mix.exs +++ b/apps/emqx_bridge_azure_event_hub/mix.exs @@ -23,7 +23,7 @@ defmodule EMQXBridgeAzureEventHub.MixProject do def deps() do [ - {:wolff, github: "kafka4beam/wolff", tag: "2.0.0"}, + {:wolff, github: "kafka4beam/wolff", tag: "3.0.2"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod, github: "kafka4beam/brod", tag: "3.18.0"}, diff --git a/apps/emqx_bridge_azure_event_hub/rebar.config b/apps/emqx_bridge_azure_event_hub/rebar.config index 76ea7fa6c..c8be2a6a3 100644 --- a/apps/emqx_bridge_azure_event_hub/rebar.config +++ b/apps/emqx_bridge_azure_event_hub/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "2.0.0"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "3.0.2"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, diff --git a/apps/emqx_bridge_confluent/mix.exs b/apps/emqx_bridge_confluent/mix.exs index 46cbe9a02..134e924fc 100644 --- a/apps/emqx_bridge_confluent/mix.exs +++ b/apps/emqx_bridge_confluent/mix.exs @@ -23,7 +23,7 @@ defmodule EMQXBridgeConfluent.MixProject do def deps() do [ - {:wolff, github: "kafka4beam/wolff", tag: "2.0.0"}, + {:wolff, github: "kafka4beam/wolff", tag: "3.0.2"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod, github: "kafka4beam/brod", tag: "3.18.0"}, diff --git a/apps/emqx_bridge_confluent/rebar.config b/apps/emqx_bridge_confluent/rebar.config index 1a91f501d..786b1cf82 100644 --- a/apps/emqx_bridge_confluent/rebar.config +++ b/apps/emqx_bridge_confluent/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "2.0.0"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "3.0.2"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, diff --git a/apps/emqx_bridge_kafka/mix.exs b/apps/emqx_bridge_kafka/mix.exs index b74b1fdd0..a1a59cb08 100644 --- a/apps/emqx_bridge_kafka/mix.exs +++ b/apps/emqx_bridge_kafka/mix.exs @@ -23,7 +23,7 @@ defmodule EMQXBridgeKafka.MixProject do def deps() do [ - {:wolff, github: "kafka4beam/wolff", tag: "2.0.0"}, + {:wolff, github: "kafka4beam/wolff", tag: "3.0.2"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod, github: "kafka4beam/brod", tag: "3.18.0"}, diff --git a/apps/emqx_bridge_kafka/rebar.config b/apps/emqx_bridge_kafka/rebar.config index b89c9190f..77d9b95ef 100644 --- a/apps/emqx_bridge_kafka/rebar.config +++ b/apps/emqx_bridge_kafka/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "2.0.0"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "3.0.2"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 56aabb1c3..9119ee6c4 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -477,7 +477,7 @@ do_start_producer(KafkaClientId, KafkaTopic) -> ProducerConfig = #{ name => Name, - partitioner => roundrobin, + partitioner => random, partition_count_refresh_interval_seconds => 1_000, replayq_max_total_bytes => 10_000, replayq_seg_bytes => 9_000, @@ -1520,7 +1520,7 @@ t_receive_after_recovery(Config) -> key => <<"commit", (integer_to_binary(N))/binary>>, value => <<"commit", (integer_to_binary(N))/binary>> } - || N <- lists:seq(1, NPartitions) + || N <- lists:seq(1, NPartitions * 10) ], %% we do distinct passes over this producing part so that %% wolff won't batch everything together. @@ -1933,7 +1933,7 @@ t_node_joins_existing_cluster(Config) -> Val = <<"v", (integer_to_binary(N))/binary>>, publish(Config, KafkaTopic, [#{key => Key, value => Val}]) end, - lists:seq(1, NPartitions) + lists:seq(1, 10 * NPartitions) ), {ok, _} = snabbkaffe:receive_events(SRef1), diff --git a/mix.exs b/mix.exs index 53e5b304f..399c996a6 100644 --- a/mix.exs +++ b/mix.exs @@ -361,7 +361,7 @@ defmodule EMQXUmbrella.MixProject do {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1+ezstd-v1.0.5-emqx1"}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true}, - {:wolff, github: "kafka4beam/wolff", tag: "2.0.0"}, + {:wolff, github: "kafka4beam/wolff", tag: "3.0.2"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod, github: "kafka4beam/brod", tag: "3.18.0"}, From df1f4fad7009cc88990491b4b346ac15d07cf7ca Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 10 Jul 2024 18:20:05 -0300 Subject: [PATCH 2/5] feat(kafka producer): allow dynamic topics from pre-configured topics Fixes https://emqx.atlassian.net/browse/EMQX-12656 --- .../emqx_bridge_azure_event_hub_v2_SUITE.erl | 34 +++ .../emqx_bridge_confluent_producer_SUITE.erl | 34 +++ .../src/emqx_bridge_kafka.erl | 45 +++- .../src/emqx_bridge_kafka_impl_producer.erl | 178 +++++++++---- ...emqx_bridge_kafka_producer_action_info.erl | 13 +- .../emqx_bridge_v2_kafka_producer_SUITE.erl | 238 +++++++++++++++++- changes/ee/feat-13452.en.md | 1 + rel/i18n/emqx_bridge_azure_event_hub.hocon | 12 +- rel/i18n/emqx_bridge_confluent_producer.hocon | 14 +- rel/i18n/emqx_bridge_kafka.hocon | 11 +- 10 files changed, 518 insertions(+), 62 deletions(-) create mode 100644 changes/ee/feat-13452.en.md diff --git a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl index 661b8819c..0136ec568 100644 --- a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl +++ b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl @@ -391,3 +391,37 @@ t_multiple_actions_sharing_topic(Config) -> ] ), ok. + +t_pre_configured_topics(Config) -> + ActionConfig0 = ?config(action_config, Config), + ActionConfig = + emqx_utils_maps:deep_merge( + ActionConfig0, + #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} + ), + ok = emqx_bridge_v2_kafka_producer_SUITE:t_pre_configured_topics( + [ + {type, ?BRIDGE_TYPE_BIN}, + {connector_name, ?config(connector_name, Config)}, + {connector_config, ?config(connector_config, Config)}, + {action_config, ActionConfig} + ] + ), + ok. + +t_templated_topic_and_no_pre_configured_topics(Config) -> + ActionConfig0 = ?config(action_config, Config), + ActionConfig = + emqx_utils_maps:deep_merge( + ActionConfig0, + #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} + ), + ok = emqx_bridge_v2_kafka_producer_SUITE:t_templated_topic_and_no_pre_configured_topics( + [ + {type, ?BRIDGE_TYPE_BIN}, + {connector_name, ?config(connector_name, Config)}, + {connector_config, ?config(connector_config, Config)}, + {action_config, ActionConfig} + ] + ), + ok. diff --git a/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl b/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl index 0b3a22a99..de92b9327 100644 --- a/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl +++ b/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl @@ -400,3 +400,37 @@ t_multiple_actions_sharing_topic(Config) -> ] ), ok. + +t_pre_configured_topics(Config) -> + ActionConfig0 = ?config(action_config, Config), + ActionConfig = + emqx_utils_maps:deep_merge( + ActionConfig0, + #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} + ), + ok = emqx_bridge_v2_kafka_producer_SUITE:t_pre_configured_topics( + [ + {type, ?ACTION_TYPE_BIN}, + {connector_name, ?config(connector_name, Config)}, + {connector_config, ?config(connector_config, Config)}, + {action_config, ActionConfig} + ] + ), + ok. + +t_templated_topic_and_no_pre_configured_topics(Config) -> + ActionConfig0 = ?config(action_config, Config), + ActionConfig = + emqx_utils_maps:deep_merge( + ActionConfig0, + #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} + ), + ok = emqx_bridge_v2_kafka_producer_SUITE:t_templated_topic_and_no_pre_configured_topics( + [ + {type, ?ACTION_TYPE_BIN}, + {connector_name, ?config(connector_name, Config)}, + {connector_config, ?config(connector_config, Config)}, + {action_config, ActionConfig} + ] + ), + ok. diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 83bc33266..9a2fa91cf 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -295,6 +295,7 @@ fields("config_producer") -> fields("config_consumer") -> fields(kafka_consumer); fields(kafka_producer) -> + %% Schema used by bridges V1. connector_config_fields() ++ producer_opts(v1); fields(kafka_producer_action) -> [ @@ -306,6 +307,10 @@ fields(kafka_producer_action) -> {tags, emqx_schema:tags_schema()}, {description, emqx_schema:description_schema()} ] ++ producer_opts(action); +fields(pre_configured_topic) -> + [ + {topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})} + ]; fields(kafka_consumer) -> connector_config_fields() ++ fields(consumer_opts); fields(ssl_client_opts) -> @@ -364,9 +369,41 @@ fields(socket_opts) -> validator => fun emqx_schema:validate_tcp_keepalive/1 })} ]; +fields(v1_producer_kafka_opts) -> + OldSchemaFields = + [ + topic, + message, + max_batch_bytes, + compression, + partition_strategy, + required_acks, + kafka_headers, + kafka_ext_headers, + kafka_header_value_encode_mode, + partition_count_refresh_interval, + partitions_limit, + max_inflight, + buffer, + query_mode, + sync_query_timeout + ], + Fields = fields(producer_kafka_opts), + lists:filter( + fun({K, _V}) -> lists:member(K, OldSchemaFields) end, + Fields + ); fields(producer_kafka_opts) -> [ {topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})}, + {pre_configured_topics, + mk( + hoconsc:array(ref(pre_configured_topic)), + #{ + default => [], + desc => ?DESC("producer_pre_configured_topics") + } + )}, {message, mk(ref(kafka_message), #{required => false, desc => ?DESC(kafka_message)})}, {max_batch_bytes, mk(emqx_schema:bytesize(), #{default => <<"896KB">>, desc => ?DESC(max_batch_bytes)})}, @@ -675,15 +712,15 @@ resource_opts() -> %% However we need to keep it backward compatible for generated schema json (version 0.1.0) %% since schema is data for the 'schemas' API. parameters_field(ActionOrBridgeV1) -> - {Name, Alias} = + {Name, Alias, Ref} = case ActionOrBridgeV1 of v1 -> - {kafka, parameters}; + {kafka, parameters, v1_producer_kafka_opts}; action -> - {parameters, kafka} + {parameters, kafka, producer_kafka_opts} end, {Name, - mk(ref(producer_kafka_opts), #{ + mk(ref(Ref), #{ required => true, aliases => [Alias], desc => ?DESC(producer_kafka_opts), diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 6d88a329e..80de98402 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -122,8 +122,8 @@ on_add_channel( {ok, NewState}. create_producers_for_bridge_v2( - InstId, - BridgeV2Id, + ConnResId, + ActionResId, ClientId, #{ bridge_type := BridgeType, @@ -132,33 +132,57 @@ create_producers_for_bridge_v2( ) -> #{ message := MessageTemplate, - topic := KafkaTopic, + pre_configured_topics := PreConfiguredTopics0, + topic := KafkaTopic0, sync_query_timeout := SyncQueryTimeout } = KafkaConfig, + TopicTemplate = {TopicType, KafkaTopic} = maybe_preproc_topic(KafkaTopic0), + PreConfiguredTopics = [T || #{topic := T} <- PreConfiguredTopics0], + KafkaTopics0 = + case TopicType of + fixed -> + [KafkaTopic | PreConfiguredTopics]; + dynamic -> + PreConfiguredTopics + end, + case KafkaTopics0 of + [] -> + throw(<< + "Either the Kafka topic must be fixed (not a template)," + " or at least one pre-defined topic must be set." + >>); + _ -> + ok + end, + KafkaTopics = lists:map(fun bin/1, KafkaTopics0), KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)), KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])), KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none), MaxPartitions = maps:get(partitions_limit, KafkaConfig, all_partitions), - #{name := BridgeName} = emqx_bridge_v2:parse_id(BridgeV2Id), - IsDryRun = emqx_resource:is_dry_run(BridgeV2Id), - ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions), + #{name := BridgeName} = emqx_bridge_v2:parse_id(ActionResId), + IsDryRun = emqx_resource:is_dry_run(ActionResId), + [AKafkaTopic | _] = KafkaTopics, + ok = check_topic_and_leader_connections(ActionResId, ClientId, AKafkaTopic, MaxPartitions), WolffProducerConfig = producers_config( - BridgeType, BridgeName, KafkaConfig, IsDryRun, BridgeV2Id + BridgeType, BridgeName, KafkaConfig, IsDryRun, ActionResId ), - case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of + case wolff:ensure_supervised_dynamic_producers(ClientId, WolffProducerConfig) of {ok, Producers} -> - ok = emqx_resource:allocate_resource(InstId, {?kafka_producers, BridgeV2Id}, Producers), ok = emqx_resource:allocate_resource( - InstId, {?kafka_telemetry_id, BridgeV2Id}, BridgeV2Id + ConnResId, {?kafka_producers, ActionResId}, Producers ), - _ = maybe_install_wolff_telemetry_handlers(BridgeV2Id), + ok = emqx_resource:allocate_resource( + ConnResId, {?kafka_telemetry_id, ActionResId}, ActionResId + ), + _ = maybe_install_wolff_telemetry_handlers(ActionResId), {ok, #{ message_template => compile_message_template(MessageTemplate), kafka_client_id => ClientId, - kafka_topic => KafkaTopic, + topic_template => TopicTemplate, + pre_configured_topics => KafkaTopics, producers => Producers, - resource_id => BridgeV2Id, - connector_resource_id => InstId, + resource_id => ActionResId, + connector_resource_id => ConnResId, sync_query_timeout => SyncQueryTimeout, kafka_config => KafkaConfig, headers_tokens => KafkaHeadersTokens, @@ -169,7 +193,7 @@ create_producers_for_bridge_v2( {error, Reason2} -> ?SLOG(error, #{ msg => "failed_to_start_kafka_producer", - instance_id => InstId, + instance_id => ConnResId, kafka_client_id => ClientId, kafka_topic => KafkaTopic, reason => Reason2 @@ -264,7 +288,9 @@ remove_producers_for_bridge_v2( ClientId = maps:get(?kafka_client_id, AllocatedResources, no_client_id), maps:foreach( fun - ({?kafka_producers, BridgeV2IdCheck}, Producers) when BridgeV2IdCheck =:= BridgeV2Id -> + ({?kafka_producers, BridgeV2IdCheck}, Producers) when + BridgeV2IdCheck =:= BridgeV2Id + -> deallocate_producers(ClientId, Producers); ({?kafka_telemetry_id, BridgeV2IdCheck}, TelemetryId) when BridgeV2IdCheck =:= BridgeV2Id @@ -297,8 +323,10 @@ on_query( #{installed_bridge_v2s := BridgeV2Configs} = _ConnectorState ) -> #{ - message_template := Template, + message_template := MessageTemplate, + topic_template := TopicTemplate, producers := Producers, + pre_configured_topics := PreConfiguredTopics, sync_query_timeout := SyncTimeout, headers_tokens := KafkaHeadersTokens, ext_headers_tokens := KafkaExtHeadersTokens, @@ -310,7 +338,14 @@ on_query( headers_val_encode_mode => KafkaHeadersValEncodeMode }, try - KafkaMessage = render_message(Template, KafkaHeaders, Message), + KafkaTopic = render_topic(TopicTemplate, Message), + case lists:member(KafkaTopic, PreConfiguredTopics) of + false -> + throw({unknown_topic, KafkaTopic}); + true -> + ok + end, + KafkaMessage = render_message(MessageTemplate, KafkaHeaders, Message), ?tp( emqx_bridge_kafka_impl_producer_sync_query, #{headers_config => KafkaHeaders, instance_id => InstId} @@ -318,9 +353,15 @@ on_query( emqx_trace:rendered_action_template(MessageTag, #{ message => KafkaMessage }), - do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) + do_send_msg(sync, KafkaTopic, KafkaMessage, Producers, SyncTimeout) catch - error:{invalid_partition_count, Count, _Partitioner} -> + throw:bad_topic -> + ?tp("kafka_producer_failed_to_render_topic", #{}), + {error, {unrecoverable_error, failed_to_render_topic}}; + throw:{unknown_topic, Topic} -> + ?tp("kafka_producer_resolved_to_unknown_topic", #{}), + {error, {unrecoverable_error, {resolved_to_unknown_topic, Topic}}}; + throw:#{cause := invalid_partition_count, count := Count} -> ?tp("kafka_producer_invalid_partition_count", #{ action_id => MessageTag, query_mode => sync @@ -365,7 +406,9 @@ on_query_async( ) -> #{ message_template := Template, + topic_template := TopicTemplate, producers := Producers, + pre_configured_topics := PreConfiguredTopics, headers_tokens := KafkaHeadersTokens, ext_headers_tokens := KafkaExtHeadersTokens, headers_val_encode_mode := KafkaHeadersValEncodeMode @@ -376,6 +419,13 @@ on_query_async( headers_val_encode_mode => KafkaHeadersValEncodeMode }, try + KafkaTopic = render_topic(TopicTemplate, Message), + case lists:member(KafkaTopic, PreConfiguredTopics) of + false -> + throw({unknown_topic, KafkaTopic}); + true -> + ok + end, KafkaMessage = render_message(Template, KafkaHeaders, Message), ?tp( emqx_bridge_kafka_impl_producer_async_query, @@ -384,9 +434,15 @@ on_query_async( emqx_trace:rendered_action_template(MessageTag, #{ message => KafkaMessage }), - do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) + do_send_msg(async, KafkaTopic, KafkaMessage, Producers, AsyncReplyFn) catch - error:{invalid_partition_count, Count, _Partitioner} -> + throw:bad_topic -> + ?tp("kafka_producer_failed_to_render_topic", #{}), + {error, {unrecoverable_error, failed_to_render_topic}}; + throw:{unknown_topic, Topic} -> + ?tp("kafka_producer_resolved_to_unknown_topic", #{}), + {error, {unrecoverable_error, {resolved_to_unknown_topic, Topic}}}; + throw:#{cause := invalid_partition_count, count := Count} -> ?tp("kafka_producer_invalid_partition_count", #{ action_id => MessageTag, query_mode => async @@ -424,9 +480,28 @@ compile_message_template(T) -> timestamp => preproc_tmpl(TimestampTemplate) }. +maybe_preproc_topic(Topic) -> + Template = emqx_template:parse(Topic), + case emqx_template:placeholders(Template) of + [] -> + {fixed, bin(Topic)}; + [_ | _] -> + {dynamic, Template} + end. + preproc_tmpl(Tmpl) -> emqx_placeholder:preproc_tmpl(Tmpl). +render_topic({fixed, KafkaTopic}, _Message) -> + KafkaTopic; +render_topic({dynamic, Template}, Message) -> + try + iolist_to_binary(emqx_template:render_strict(Template, Message)) + catch + error:_Errors -> + throw(bad_topic) + end. + render_message( #{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate}, #{ @@ -468,9 +543,11 @@ render_timestamp(Template, Message) -> erlang:system_time(millisecond) end. -do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) -> +do_send_msg(sync, KafkaTopic, KafkaMessage, Producers, SyncTimeout) -> try - {_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], SyncTimeout), + {_Partition, _Offset} = wolff:send_sync2( + Producers, KafkaTopic, [KafkaMessage], SyncTimeout + ), ok catch error:{producer_down, _} = Reason -> @@ -478,7 +555,7 @@ do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) -> error:timeout -> {error, timeout} end; -do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) -> +do_send_msg(async, KafkaTopic, KafkaMessage, Producers, AsyncReplyFn) -> %% * Must be a batch because wolff:send and wolff:send_sync are batch APIs %% * Must be a single element batch because wolff books calls, but not batch sizes %% for counters and gauges. @@ -486,7 +563,9 @@ do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) -> %% The retuned information is discarded here. %% If the producer process is down when sending, this function would %% raise an error exception which is to be caught by the caller of this callback - {_Partition, Pid} = wolff:send(Producers, Batch, {fun ?MODULE:on_kafka_ack/3, [AsyncReplyFn]}), + {_Partition, Pid} = wolff:send2( + Producers, KafkaTopic, Batch, {fun ?MODULE:on_kafka_ack/3, [AsyncReplyFn]} + ), %% this Pid is so far never used because Kafka producer is by-passing the buffer worker {ok, Pid}. @@ -527,20 +606,24 @@ on_get_status( end. on_get_channel_status( - _ResId, - ChannelId, + _ConnResId, + ActionResId, #{ client_id := ClientId, installed_bridge_v2s := Channels - } = _State + } = _ConnState ) -> %% Note: we must avoid returning `?status_disconnected' here. Returning %% `?status_disconnected' will make resource manager try to restart the producers / %% connector, thus potentially dropping data held in wolff producer's replayq. The %% only exception is if the topic does not exist ("unhealthy target"). - #{kafka_topic := KafkaTopic, partitions_limit := MaxPartitions} = maps:get(ChannelId, Channels), + #{ + pre_configured_topics := PreConfiguredTopics, + partitions_limit := MaxPartitions + } = maps:get(ActionResId, Channels), + [KafkaTopic | _] = PreConfiguredTopics, try - ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions), + ok = check_topic_and_leader_connections(ActionResId, ClientId, KafkaTopic, MaxPartitions), ?status_connected catch throw:{unhealthy_target, Msg} -> @@ -549,11 +632,11 @@ on_get_channel_status( {?status_connecting, {K, E}} end. -check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions) -> +check_topic_and_leader_connections(ActionResId, ClientId, KafkaTopic, MaxPartitions) -> case wolff_client_sup:find_client(ClientId) of {ok, Pid} -> ok = check_topic_status(ClientId, Pid, KafkaTopic), - ok = check_if_healthy_leaders(ClientId, Pid, KafkaTopic, MaxPartitions); + ok = check_if_healthy_leaders(ActionResId, ClientId, Pid, KafkaTopic, MaxPartitions); {error, #{reason := no_such_client}} -> throw(#{ reason => cannot_find_kafka_client, @@ -591,8 +674,10 @@ error_summary(Map, [Error]) -> error_summary(Map, [Error | More]) -> Map#{first_error => Error, total_errors => length(More) + 1}. -check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic, MaxPartitions) when is_pid(ClientPid) -> - case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of +check_if_healthy_leaders(ActionResId, ClientId, ClientPid, KafkaTopic, MaxPartitions) when + is_pid(ClientPid) +-> + case wolff_client:get_leader_connections(ClientPid, ActionResId, KafkaTopic, MaxPartitions) of {ok, Leaders} -> %% Kafka is considered healthy as long as any of the partition leader is reachable. case lists:partition(fun({_Partition, Pid}) -> is_alive(Pid) end, Leaders) of @@ -654,7 +739,7 @@ ssl(#{enable := true} = SSL) -> ssl(_) -> false. -producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) -> +producers_config(BridgeType, BridgeName, Input, IsDryRun, ActionResId) -> #{ max_batch_bytes := MaxBatchBytes, compression := Compression, @@ -696,8 +781,8 @@ producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) -> max_batch_bytes => MaxBatchBytes, max_send_ahead => MaxInflight - 1, compression => Compression, - alias => BridgeV2Id, - telemetry_meta_data => #{bridge_id => BridgeV2Id}, + group => ActionResId, + telemetry_meta_data => #{bridge_id => ActionResId}, max_partitions => MaxPartitions }. @@ -773,20 +858,19 @@ handle_telemetry_event(_EventId, _Metrics, _MetaData, _HandlerConfig) -> %% Note: don't use the instance/manager ID, as that changes everytime %% the bridge is recreated, and will lead to multiplication of %% metrics. --spec telemetry_handler_id(resource_id()) -> binary(). -telemetry_handler_id(ResourceID) -> - <<"emqx-bridge-kafka-producer-", ResourceID/binary>>. +-spec telemetry_handler_id(action_resource_id()) -> binary(). +telemetry_handler_id(ActionResId) -> + <<"emqx-bridge-kafka-producer-", ActionResId/binary>>. -uninstall_telemetry_handlers(ResourceID) -> - HandlerID = telemetry_handler_id(ResourceID), - telemetry:detach(HandlerID). +uninstall_telemetry_handlers(TelemetryId) -> + telemetry:detach(TelemetryId). -maybe_install_wolff_telemetry_handlers(ResourceID) -> +maybe_install_wolff_telemetry_handlers(TelemetryId) -> %% Attach event handlers for Kafka telemetry events. If a handler with the %% handler id already exists, the attach_many function does nothing telemetry:attach_many( %% unique handler id - telemetry_handler_id(ResourceID), + telemetry_handler_id(TelemetryId), [ [wolff, dropped_queue_full], [wolff, queuing], @@ -798,7 +882,7 @@ maybe_install_wolff_telemetry_handlers(ResourceID) -> %% wolff producers; otherwise, multiple kafka producer bridges %% will install multiple handlers to the same wolff events, %% multiplying the metric counts... - #{bridge_id => ResourceID} + #{bridge_id => TelemetryId} ). preproc_kafka_headers(HeadersTmpl) when HeadersTmpl =:= <<>>; HeadersTmpl =:= undefined -> diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_producer_action_info.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_producer_action_info.erl index d97e68ba6..b9e13e717 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_producer_action_info.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_producer_action_info.erl @@ -26,7 +26,12 @@ schema_module() -> emqx_bridge_kafka. connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> BridgeV1Config1 = maps:remove(<<"connector">>, ActionConfig), BridgeV1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config1), - emqx_utils_maps:rename(<<"parameters">>, <<"kafka">>, BridgeV1Config2). + BridgeV1Config = emqx_utils_maps:rename(<<"parameters">>, <<"kafka">>, BridgeV1Config2), + maps:update_with( + <<"kafka">>, + fun(Params) -> maps:with(v1_parameters(), Params) end, + BridgeV1Config + ). bridge_v1_config_to_action_config(BridgeV1Conf0 = #{<<"producer">> := _}, ConnectorName) -> %% Ancient v1 config, when `kafka' key was wrapped by `producer' @@ -51,6 +56,12 @@ bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) -> %% Internal helper functions %%------------------------------------------------------------------------------------------ +v1_parameters() -> + [ + to_bin(K) + || {K, _} <- emqx_bridge_kafka:fields(v1_producer_kafka_opts) + ]. + producer_action_field_keys() -> [ to_bin(K) diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index c26f5e94e..6246faaf1 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -23,6 +23,7 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("brod/include/brod.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). +-include_lib("emqx/include/asserts.hrl"). -import(emqx_common_test_helpers, [on_exit/1]). @@ -165,6 +166,9 @@ send_message(Type, ActionName) -> resolve_kafka_offset() -> KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(), + resolve_kafka_offset(KafkaTopic). + +resolve_kafka_offset(KafkaTopic) -> Partition = 0, Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(), {ok, Offset0} = emqx_bridge_kafka_impl_producer_SUITE:resolve_kafka_offset( @@ -174,11 +178,32 @@ resolve_kafka_offset() -> check_kafka_message_payload(Offset, ExpectedPayload) -> KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(), + check_kafka_message_payload(KafkaTopic, Offset, ExpectedPayload). + +check_kafka_message_payload(KafkaTopic, Offset, ExpectedPayload) -> Partition = 0, Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(), {ok, {_, [KafkaMsg0]}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset), ?assertMatch(#kafka_message{value = ExpectedPayload}, KafkaMsg0). +ensure_kafka_topic(KafkaTopic) -> + TopicConfigs = [ + #{ + name => KafkaTopic, + num_partitions => 1, + replication_factor => 1, + assignments => [], + configs => [] + } + ], + RequestConfig = #{timeout => 5_000}, + ConnConfig = #{}, + Endpoints = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(), + case brod:create_topics(Endpoints, TopicConfigs, RequestConfig, ConnConfig) of + ok -> ok; + {error, topic_already_exists} -> ok + end. + action_config(ConnectorName) -> action_config(ConnectorName, _Overrides = #{}). @@ -728,9 +753,13 @@ t_invalid_partition_count_metrics(Config) -> %% Simulate `invalid_partition_count' emqx_common_test_helpers:with_mock( wolff, - send_sync, - fun(_Producers, _Msgs, _Timeout) -> - error({invalid_partition_count, 0, partitioner}) + send_sync2, + fun(_Producers, _Topic, _Msgs, _Timeout) -> + throw(#{ + cause => invalid_partition_count, + count => 0, + partitioner => partitioner + }) end, fun() -> {{ok, _}, {ok, _}} = @@ -773,9 +802,13 @@ t_invalid_partition_count_metrics(Config) -> %% Simulate `invalid_partition_count' emqx_common_test_helpers:with_mock( wolff, - send, - fun(_Producers, _Msgs, _Timeout) -> - error({invalid_partition_count, 0, partitioner}) + send2, + fun(_Producers, _Topic, _Msgs, _AckCallback) -> + throw(#{ + cause => invalid_partition_count, + count => 0, + partitioner => partitioner + }) end, fun() -> {{ok, _}, {ok, _}} = @@ -881,3 +914,196 @@ t_multiple_actions_sharing_topic(Config) -> end ), ok. + +%% Smoke tests for using a templated topic and a list of pre-configured kafka topics. +t_pre_configured_topics(Config) -> + Type = proplists:get_value(type, Config, ?TYPE), + ConnectorName = proplists:get_value(connector_name, Config, <<"c">>), + ConnectorConfig = proplists:get_value(connector_config, Config, connector_config()), + ActionName = <<"pre_configured_topics">>, + ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)), + PreConfigureTopic1 = <<"pct1">>, + PreConfigureTopic2 = <<"pct2">>, + ensure_kafka_topic(PreConfigureTopic1), + ensure_kafka_topic(PreConfigureTopic2), + ActionConfig = emqx_bridge_v2_testlib:parse_and_check( + action, + Type, + ActionName, + emqx_utils_maps:deep_merge( + ActionConfig1, + #{ + <<"parameters">> => #{ + <<"topic">> => <<"pct${.payload.n}">>, + <<"message">> => #{ + <<"key">> => <<"${.clientid}">>, + <<"value">> => <<"${.payload.p}">> + }, + <<"pre_configured_topics">> => [ + #{<<"topic">> => PreConfigureTopic1}, + #{<<"topic">> => PreConfigureTopic2} + ] + } + } + ) + ), + ?check_trace( + #{timetrap => 7_000}, + begin + ConnectorParams = [ + {connector_config, ConnectorConfig}, + {connector_name, ConnectorName}, + {connector_type, Type} + ], + ActionParams = [ + {action_config, ActionConfig}, + {action_name, ActionName}, + {action_type, Type} + ], + {ok, {{_, 201, _}, _, #{}}} = + emqx_bridge_v2_testlib:create_connector_api(ConnectorParams), + + {ok, {{_, 201, _}, _, #{}}} = + emqx_bridge_v2_testlib:create_action_api(ActionParams), + RuleTopic = <<"pct">>, + {ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http( + Type, + RuleTopic, + [ + {bridge_name, ActionName} + ], + #{ + sql => + <<"select *, json_decode(payload) as payload from \"", RuleTopic/binary, + "\" ">> + } + ), + ?assertStatusAPI(Type, ActionName, <<"connected">>), + + HandlerId = ?FUNCTION_NAME, + TestPid = self(), + telemetry:attach_many( + HandlerId, + emqx_resource_metrics:events(), + fun(EventName, Measurements, Metadata, _Config) -> + Data = #{ + name => EventName, + measurements => Measurements, + metadata => Metadata + }, + TestPid ! {telemetry, Data}, + ok + end, + unused_config + ), + on_exit(fun() -> telemetry:detach(HandlerId) end), + + {ok, C} = emqtt:start_link(#{}), + {ok, _} = emqtt:connect(C), + Payload = fun(Map) -> emqx_utils_json:encode(Map) end, + Offset1 = resolve_kafka_offset(PreConfigureTopic1), + Offset2 = resolve_kafka_offset(PreConfigureTopic2), + {ok, _} = emqtt:publish(C, RuleTopic, Payload(#{n => 1, p => <<"p1">>}), [{qos, 1}]), + {ok, _} = emqtt:publish(C, RuleTopic, Payload(#{n => 2, p => <<"p2">>}), [{qos, 1}]), + + check_kafka_message_payload(PreConfigureTopic1, Offset1, <<"p1">>), + check_kafka_message_payload(PreConfigureTopic2, Offset2, <<"p2">>), + + ActionId = emqx_bridge_v2:id(Type, ActionName), + ?assertEqual(2, emqx_resource_metrics:matched_get(ActionId)), + ?assertEqual(2, emqx_resource_metrics:success_get(ActionId)), + ?assertEqual(0, emqx_resource_metrics:queuing_get(ActionId)), + + ?assertReceive( + {telemetry, #{ + measurements := #{gauge_set := _}, + metadata := #{worker_id := _, resource_id := ActionId} + }} + ), + + %% If there isn't enough information in the context to resolve to a topic, it + %% should be an unrecoverable error. + ?assertMatch( + {_, {ok, _}}, + ?wait_async_action( + emqtt:publish(C, RuleTopic, Payload(#{not_enough => <<"info">>}), [{qos, 1}]), + #{?snk_kind := "kafka_producer_failed_to_render_topic"} + ) + ), + + %% If it's possible to render the topic, but it isn't in the pre-configured + %% list, it should be an unrecoverable error. + ?assertMatch( + {_, {ok, _}}, + ?wait_async_action( + emqtt:publish(C, RuleTopic, Payload(#{n => 99}), [{qos, 1}]), + #{?snk_kind := "kafka_producer_resolved_to_unknown_topic"} + ) + ), + + ok + end, + [] + ), + ok. + +%% Checks that creating an action with templated topic and no pre-configured kafka topics +%% throws. +t_templated_topic_and_no_pre_configured_topics(Config) -> + Type = proplists:get_value(type, Config, ?TYPE), + ConnectorName = proplists:get_value(connector_name, Config, <<"c">>), + ConnectorConfig = proplists:get_value(connector_config, Config, connector_config()), + ActionName = <<"bad_pre_configured_topics">>, + ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)), + ActionConfig = emqx_bridge_v2_testlib:parse_and_check( + action, + Type, + ActionName, + emqx_utils_maps:deep_merge( + ActionConfig1, + #{ + <<"parameters">> => #{ + <<"topic">> => <<"pct${.payload.n}">>, + <<"pre_configured_topics">> => [] + } + } + ) + ), + ?check_trace( + #{timetrap => 7_000}, + begin + ConnectorParams = [ + {connector_config, ConnectorConfig}, + {connector_name, ConnectorName}, + {connector_type, Type} + ], + ActionParams = [ + {action_config, ActionConfig}, + {action_name, ActionName}, + {action_type, Type} + ], + {ok, {{_, 201, _}, _, #{}}} = + emqx_bridge_v2_testlib:create_connector_api(ConnectorParams), + + {ok, {{_, 201, _}, _, #{}}} = + emqx_bridge_v2_testlib:create_action_api(ActionParams), + + ?assertMatch( + {ok, + {{_, 200, _}, _, #{ + <<"status_reason">> := + << + "Either the Kafka topic must be fixed (not a template)," + " or at least one pre-defined topic must be set." + >>, + <<"status">> := <<"disconnected">>, + <<"node_status">> := [#{<<"status">> := <<"disconnected">>}] + }}}, + emqx_bridge_v2_testlib:get_bridge_api(Type, ActionName) + ), + + ok + end, + [] + ), + ok. diff --git a/changes/ee/feat-13452.en.md b/changes/ee/feat-13452.en.md new file mode 100644 index 000000000..95dae8d32 --- /dev/null +++ b/changes/ee/feat-13452.en.md @@ -0,0 +1 @@ +Added to possibility to configure a list of predefined Kafka topics to Kafka producer actions, and also to use templates to define the destination Kafka topic. diff --git a/rel/i18n/emqx_bridge_azure_event_hub.hocon b/rel/i18n/emqx_bridge_azure_event_hub.hocon index 3b96e23e6..e683bc9e9 100644 --- a/rel/i18n/emqx_bridge_azure_event_hub.hocon +++ b/rel/i18n/emqx_bridge_azure_event_hub.hocon @@ -69,7 +69,7 @@ producer_kafka_opts.label: """Azure Event Hubs Producer""" kafka_topic.desc: -"""Event Hubs name""" +"""Event Hubs name. Supports templates (e.g.: `t-${payload.t}`).""" kafka_topic.label: """Event Hubs Name""" @@ -350,4 +350,14 @@ Setting this to a value which is greater than the total number of partitions in partitions_limit.label: """Max Partitions""" +producer_pre_configured_topics.label: +"""Pre-configured Event Hubs""" +producer_pre_configured_topics.desc: +"""A list of pre-configured event hubs to be used when using templates to define outgoing topics. If the topic template fails to resolve to a value due to missing data in the incoming message, or if it resolves to a topic that is not contained in this list, then publishing will fail.""" + +pre_configured_topic.label: +"""Event Hubs Name""" +pre_configured_topic.desc: +"""Event Hubs name""" + } diff --git a/rel/i18n/emqx_bridge_confluent_producer.hocon b/rel/i18n/emqx_bridge_confluent_producer.hocon index 748373691..81c2c0a89 100644 --- a/rel/i18n/emqx_bridge_confluent_producer.hocon +++ b/rel/i18n/emqx_bridge_confluent_producer.hocon @@ -69,10 +69,10 @@ producer_kafka_opts.label: """Confluent Producer""" kafka_topic.desc: -"""Event Hub name""" +"""Kafka topic name. Supports templates (e.g.: `t-${payload.t}`).""" kafka_topic.label: -"""Event Hub Name""" +"""Kafka Topic Name""" kafka_message_timestamp.desc: """Which timestamp to use. The timestamp is expected to be a millisecond precision Unix epoch which can be in string format, e.g. 1661326462115 or '1661326462115'. When the desired data field for this template is not found, or if the found data is not a valid integer, the current system timestamp will be used.""" @@ -350,4 +350,14 @@ server_name_indication.desc: server_name_indication.label: """SNI""" +producer_pre_configured_topics.label: +"""Pre-configured Topics""" +producer_pre_configured_topics.desc: +"""A list of pre-configured topics to be used when using templates to define outgoing topics. If the topic template fails to resolve to a value due to missing data in the incoming message, or if it resolves to a topic that is not contained in this list, then publishing will fail.""" + +pre_configured_topic.label: +"""Kafka Topic Name""" +pre_configured_topic.desc: +"""Kafka topic name""" + } diff --git a/rel/i18n/emqx_bridge_kafka.hocon b/rel/i18n/emqx_bridge_kafka.hocon index 6e0074ddd..59896cc22 100644 --- a/rel/i18n/emqx_bridge_kafka.hocon +++ b/rel/i18n/emqx_bridge_kafka.hocon @@ -81,7 +81,7 @@ producer_kafka_opts.label: """Kafka Producer""" kafka_topic.desc: -"""Kafka topic name""" +"""Kafka topic name. Supports templates (e.g.: `t-${payload.t}`).""" kafka_topic.label: """Kafka Topic Name""" @@ -446,5 +446,14 @@ server_name_indication.desc: server_name_indication.label: """SNI""" +producer_pre_configured_topics.label: +"""Pre-configured Topics""" +producer_pre_configured_topics.desc: +"""A list of pre-configured topics to be used when using templates to define outgoing topics. If the topic template fails to resolve to a value due to missing data in the incoming message, or if it resolves to a topic that is not contained in this list, then publishing will fail.""" + +pre_configured_topic.label: +"""Kafka Topic Name""" +pre_configured_topic.desc: +"""Kafka topic name""" } From 4e0742c66facdf0f290157b832e5271ee045e42b Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 26 Jul 2024 14:25:20 -0300 Subject: [PATCH 3/5] feat: make kafka producer freely dynamic --- .../emqx_bridge_azure_event_hub_v2_SUITE.erl | 51 ++++------- .../emqx_bridge_confluent_producer_SUITE.erl | 51 ++++------- .../src/emqx_bridge_kafka.erl | 12 --- .../src/emqx_bridge_kafka_impl_producer.erl | 71 ++++++--------- .../emqx_bridge_v2_kafka_producer_SUITE.erl | 89 +++---------------- rel/i18n/emqx_bridge_azure_event_hub.hocon | 10 --- rel/i18n/emqx_bridge_confluent_producer.hocon | 10 --- rel/i18n/emqx_bridge_kafka.hocon | 10 --- 8 files changed, 73 insertions(+), 231 deletions(-) diff --git a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl index 0136ec568..f2a06cf65 100644 --- a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl +++ b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl @@ -382,46 +382,31 @@ t_multiple_actions_sharing_topic(Config) -> ActionConfig0, #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} ), - ok = emqx_bridge_v2_kafka_producer_SUITE:t_multiple_actions_sharing_topic( - [ - {type, ?BRIDGE_TYPE_BIN}, - {connector_name, ?config(connector_name, Config)}, - {connector_config, ?config(connector_config, Config)}, - {action_config, ActionConfig} - ] - ), + ok = + emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME( + [ + {type, ?BRIDGE_TYPE_BIN}, + {connector_name, ?config(connector_name, Config)}, + {connector_config, ?config(connector_config, Config)}, + {action_config, ActionConfig} + ] + ), ok. -t_pre_configured_topics(Config) -> +t_dynamic_topics(Config) -> ActionConfig0 = ?config(action_config, Config), ActionConfig = emqx_utils_maps:deep_merge( ActionConfig0, #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} ), - ok = emqx_bridge_v2_kafka_producer_SUITE:t_pre_configured_topics( - [ - {type, ?BRIDGE_TYPE_BIN}, - {connector_name, ?config(connector_name, Config)}, - {connector_config, ?config(connector_config, Config)}, - {action_config, ActionConfig} - ] - ), - ok. - -t_templated_topic_and_no_pre_configured_topics(Config) -> - ActionConfig0 = ?config(action_config, Config), - ActionConfig = - emqx_utils_maps:deep_merge( - ActionConfig0, - #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} + ok = + emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME( + [ + {type, ?BRIDGE_TYPE_BIN}, + {connector_name, ?config(connector_name, Config)}, + {connector_config, ?config(connector_config, Config)}, + {action_config, ActionConfig} + ] ), - ok = emqx_bridge_v2_kafka_producer_SUITE:t_templated_topic_and_no_pre_configured_topics( - [ - {type, ?BRIDGE_TYPE_BIN}, - {connector_name, ?config(connector_name, Config)}, - {connector_config, ?config(connector_config, Config)}, - {action_config, ActionConfig} - ] - ), ok. diff --git a/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl b/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl index de92b9327..f10e88463 100644 --- a/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl +++ b/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl @@ -391,46 +391,31 @@ t_multiple_actions_sharing_topic(Config) -> ActionConfig0, #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} ), - ok = emqx_bridge_v2_kafka_producer_SUITE:t_multiple_actions_sharing_topic( - [ - {type, ?ACTION_TYPE_BIN}, - {connector_name, ?config(connector_name, Config)}, - {connector_config, ?config(connector_config, Config)}, - {action_config, ActionConfig} - ] - ), + ok = + emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME( + [ + {type, ?ACTION_TYPE_BIN}, + {connector_name, ?config(connector_name, Config)}, + {connector_config, ?config(connector_config, Config)}, + {action_config, ActionConfig} + ] + ), ok. -t_pre_configured_topics(Config) -> +t_dynamic_topics(Config) -> ActionConfig0 = ?config(action_config, Config), ActionConfig = emqx_utils_maps:deep_merge( ActionConfig0, #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} ), - ok = emqx_bridge_v2_kafka_producer_SUITE:t_pre_configured_topics( - [ - {type, ?ACTION_TYPE_BIN}, - {connector_name, ?config(connector_name, Config)}, - {connector_config, ?config(connector_config, Config)}, - {action_config, ActionConfig} - ] - ), - ok. - -t_templated_topic_and_no_pre_configured_topics(Config) -> - ActionConfig0 = ?config(action_config, Config), - ActionConfig = - emqx_utils_maps:deep_merge( - ActionConfig0, - #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} + ok = + emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME( + [ + {type, ?ACTION_TYPE_BIN}, + {connector_name, ?config(connector_name, Config)}, + {connector_config, ?config(connector_config, Config)}, + {action_config, ActionConfig} + ] ), - ok = emqx_bridge_v2_kafka_producer_SUITE:t_templated_topic_and_no_pre_configured_topics( - [ - {type, ?ACTION_TYPE_BIN}, - {connector_name, ?config(connector_name, Config)}, - {connector_config, ?config(connector_config, Config)}, - {action_config, ActionConfig} - ] - ), ok. diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 9a2fa91cf..8f72523b1 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -307,10 +307,6 @@ fields(kafka_producer_action) -> {tags, emqx_schema:tags_schema()}, {description, emqx_schema:description_schema()} ] ++ producer_opts(action); -fields(pre_configured_topic) -> - [ - {topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})} - ]; fields(kafka_consumer) -> connector_config_fields() ++ fields(consumer_opts); fields(ssl_client_opts) -> @@ -396,14 +392,6 @@ fields(v1_producer_kafka_opts) -> fields(producer_kafka_opts) -> [ {topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})}, - {pre_configured_topics, - mk( - hoconsc:array(ref(pre_configured_topic)), - #{ - default => [], - desc => ?DESC("producer_pre_configured_topics") - } - )}, {message, mk(ref(kafka_message), #{required => false, desc => ?DESC(kafka_message)})}, {max_batch_bytes, mk(emqx_schema:bytesize(), #{default => <<"896KB">>, desc => ?DESC(max_batch_bytes)})}, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 80de98402..b358cd42b 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -3,6 +3,8 @@ %%-------------------------------------------------------------------- -module(emqx_bridge_kafka_impl_producer). +-feature(maybe_expr, enable). + -behaviour(emqx_resource). -include_lib("emqx_resource/include/emqx_resource.hrl"). @@ -132,37 +134,22 @@ create_producers_for_bridge_v2( ) -> #{ message := MessageTemplate, - pre_configured_topics := PreConfiguredTopics0, topic := KafkaTopic0, sync_query_timeout := SyncQueryTimeout } = KafkaConfig, - TopicTemplate = {TopicType, KafkaTopic} = maybe_preproc_topic(KafkaTopic0), - PreConfiguredTopics = [T || #{topic := T} <- PreConfiguredTopics0], - KafkaTopics0 = + TopicTemplate = {TopicType, TopicOrTemplate} = maybe_preproc_topic(KafkaTopic0), + MKafkaTopic = case TopicType of - fixed -> - [KafkaTopic | PreConfiguredTopics]; - dynamic -> - PreConfiguredTopics + fixed -> TopicOrTemplate; + dynamic -> dynamic end, - case KafkaTopics0 of - [] -> - throw(<< - "Either the Kafka topic must be fixed (not a template)," - " or at least one pre-defined topic must be set." - >>); - _ -> - ok - end, - KafkaTopics = lists:map(fun bin/1, KafkaTopics0), KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)), KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])), KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none), MaxPartitions = maps:get(partitions_limit, KafkaConfig, all_partitions), #{name := BridgeName} = emqx_bridge_v2:parse_id(ActionResId), IsDryRun = emqx_resource:is_dry_run(ActionResId), - [AKafkaTopic | _] = KafkaTopics, - ok = check_topic_and_leader_connections(ActionResId, ClientId, AKafkaTopic, MaxPartitions), + ok = check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions), WolffProducerConfig = producers_config( BridgeType, BridgeName, KafkaConfig, IsDryRun, ActionResId ), @@ -179,7 +166,7 @@ create_producers_for_bridge_v2( message_template => compile_message_template(MessageTemplate), kafka_client_id => ClientId, topic_template => TopicTemplate, - pre_configured_topics => KafkaTopics, + topic => MKafkaTopic, producers => Producers, resource_id => ActionResId, connector_resource_id => ConnResId, @@ -195,7 +182,7 @@ create_producers_for_bridge_v2( msg => "failed_to_start_kafka_producer", instance_id => ConnResId, kafka_client_id => ClientId, - kafka_topic => KafkaTopic, + kafka_topic => MKafkaTopic, reason => Reason2 }), throw( @@ -326,7 +313,6 @@ on_query( message_template := MessageTemplate, topic_template := TopicTemplate, producers := Producers, - pre_configured_topics := PreConfiguredTopics, sync_query_timeout := SyncTimeout, headers_tokens := KafkaHeadersTokens, ext_headers_tokens := KafkaExtHeadersTokens, @@ -339,12 +325,6 @@ on_query( }, try KafkaTopic = render_topic(TopicTemplate, Message), - case lists:member(KafkaTopic, PreConfiguredTopics) of - false -> - throw({unknown_topic, KafkaTopic}); - true -> - ok - end, KafkaMessage = render_message(MessageTemplate, KafkaHeaders, Message), ?tp( emqx_bridge_kafka_impl_producer_sync_query, @@ -358,7 +338,7 @@ on_query( throw:bad_topic -> ?tp("kafka_producer_failed_to_render_topic", #{}), {error, {unrecoverable_error, failed_to_render_topic}}; - throw:{unknown_topic, Topic} -> + throw:#{cause := unknown_topic_or_partition, topic := Topic} -> ?tp("kafka_producer_resolved_to_unknown_topic", #{}), {error, {unrecoverable_error, {resolved_to_unknown_topic, Topic}}}; throw:#{cause := invalid_partition_count, count := Count} -> @@ -408,7 +388,6 @@ on_query_async( message_template := Template, topic_template := TopicTemplate, producers := Producers, - pre_configured_topics := PreConfiguredTopics, headers_tokens := KafkaHeadersTokens, ext_headers_tokens := KafkaExtHeadersTokens, headers_val_encode_mode := KafkaHeadersValEncodeMode @@ -420,12 +399,6 @@ on_query_async( }, try KafkaTopic = render_topic(TopicTemplate, Message), - case lists:member(KafkaTopic, PreConfiguredTopics) of - false -> - throw({unknown_topic, KafkaTopic}); - true -> - ok - end, KafkaMessage = render_message(Template, KafkaHeaders, Message), ?tp( emqx_bridge_kafka_impl_producer_async_query, @@ -439,7 +412,7 @@ on_query_async( throw:bad_topic -> ?tp("kafka_producer_failed_to_render_topic", #{}), {error, {unrecoverable_error, failed_to_render_topic}}; - throw:{unknown_topic, Topic} -> + throw:#{cause := unknown_topic_or_partition, topic := Topic} -> ?tp("kafka_producer_resolved_to_unknown_topic", #{}), {error, {unrecoverable_error, {resolved_to_unknown_topic, Topic}}}; throw:#{cause := invalid_partition_count, count := Count} -> @@ -618,12 +591,11 @@ on_get_channel_status( %% connector, thus potentially dropping data held in wolff producer's replayq. The %% only exception is if the topic does not exist ("unhealthy target"). #{ - pre_configured_topics := PreConfiguredTopics, + topic := MKafkaTopic, partitions_limit := MaxPartitions } = maps:get(ActionResId, Channels), - [KafkaTopic | _] = PreConfiguredTopics, try - ok = check_topic_and_leader_connections(ActionResId, ClientId, KafkaTopic, MaxPartitions), + ok = check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions), ?status_connected catch throw:{unhealthy_target, Msg} -> @@ -632,22 +604,29 @@ on_get_channel_status( {?status_connecting, {K, E}} end. -check_topic_and_leader_connections(ActionResId, ClientId, KafkaTopic, MaxPartitions) -> +check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions) -> case wolff_client_sup:find_client(ClientId) of {ok, Pid} -> - ok = check_topic_status(ClientId, Pid, KafkaTopic), - ok = check_if_healthy_leaders(ActionResId, ClientId, Pid, KafkaTopic, MaxPartitions); + maybe + true ?= is_binary(MKafkaTopic), + ok = check_topic_status(ClientId, Pid, MKafkaTopic), + ok = check_if_healthy_leaders( + ActionResId, ClientId, Pid, MKafkaTopic, MaxPartitions + ) + else + false -> ok + end; {error, #{reason := no_such_client}} -> throw(#{ reason => cannot_find_kafka_client, kafka_client => ClientId, - kafka_topic => KafkaTopic + kafka_topic => MKafkaTopic }); {error, #{reason := client_supervisor_not_initialized}} -> throw(#{ reason => restarting, kafka_client => ClientId, - kafka_topic => KafkaTopic + kafka_topic => MKafkaTopic }) end. diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index 6246faaf1..08b2723e7 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -915,17 +915,17 @@ t_multiple_actions_sharing_topic(Config) -> ), ok. -%% Smoke tests for using a templated topic and a list of pre-configured kafka topics. -t_pre_configured_topics(Config) -> +%% Smoke tests for using a templated topic and adynamic kafka topics. +t_dynamic_topics(Config) -> Type = proplists:get_value(type, Config, ?TYPE), ConnectorName = proplists:get_value(connector_name, Config, <<"c">>), ConnectorConfig = proplists:get_value(connector_config, Config, connector_config()), - ActionName = <<"pre_configured_topics">>, + ActionName = <<"dynamic_topics">>, ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)), - PreConfigureTopic1 = <<"pct1">>, - PreConfigureTopic2 = <<"pct2">>, - ensure_kafka_topic(PreConfigureTopic1), - ensure_kafka_topic(PreConfigureTopic2), + PreConfiguredTopic1 = <<"pct1">>, + PreConfiguredTopic2 = <<"pct2">>, + ensure_kafka_topic(PreConfiguredTopic1), + ensure_kafka_topic(PreConfiguredTopic2), ActionConfig = emqx_bridge_v2_testlib:parse_and_check( action, Type, @@ -938,11 +938,7 @@ t_pre_configured_topics(Config) -> <<"message">> => #{ <<"key">> => <<"${.clientid}">>, <<"value">> => <<"${.payload.p}">> - }, - <<"pre_configured_topics">> => [ - #{<<"topic">> => PreConfigureTopic1}, - #{<<"topic">> => PreConfigureTopic2} - ] + } } } ) @@ -1001,13 +997,13 @@ t_pre_configured_topics(Config) -> {ok, C} = emqtt:start_link(#{}), {ok, _} = emqtt:connect(C), Payload = fun(Map) -> emqx_utils_json:encode(Map) end, - Offset1 = resolve_kafka_offset(PreConfigureTopic1), - Offset2 = resolve_kafka_offset(PreConfigureTopic2), + Offset1 = resolve_kafka_offset(PreConfiguredTopic1), + Offset2 = resolve_kafka_offset(PreConfiguredTopic2), {ok, _} = emqtt:publish(C, RuleTopic, Payload(#{n => 1, p => <<"p1">>}), [{qos, 1}]), {ok, _} = emqtt:publish(C, RuleTopic, Payload(#{n => 2, p => <<"p2">>}), [{qos, 1}]), - check_kafka_message_payload(PreConfigureTopic1, Offset1, <<"p1">>), - check_kafka_message_payload(PreConfigureTopic2, Offset2, <<"p2">>), + check_kafka_message_payload(PreConfiguredTopic1, Offset1, <<"p1">>), + check_kafka_message_payload(PreConfiguredTopic2, Offset2, <<"p2">>), ActionId = emqx_bridge_v2:id(Type, ActionName), ?assertEqual(2, emqx_resource_metrics:matched_get(ActionId)), @@ -1046,64 +1042,3 @@ t_pre_configured_topics(Config) -> [] ), ok. - -%% Checks that creating an action with templated topic and no pre-configured kafka topics -%% throws. -t_templated_topic_and_no_pre_configured_topics(Config) -> - Type = proplists:get_value(type, Config, ?TYPE), - ConnectorName = proplists:get_value(connector_name, Config, <<"c">>), - ConnectorConfig = proplists:get_value(connector_config, Config, connector_config()), - ActionName = <<"bad_pre_configured_topics">>, - ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)), - ActionConfig = emqx_bridge_v2_testlib:parse_and_check( - action, - Type, - ActionName, - emqx_utils_maps:deep_merge( - ActionConfig1, - #{ - <<"parameters">> => #{ - <<"topic">> => <<"pct${.payload.n}">>, - <<"pre_configured_topics">> => [] - } - } - ) - ), - ?check_trace( - #{timetrap => 7_000}, - begin - ConnectorParams = [ - {connector_config, ConnectorConfig}, - {connector_name, ConnectorName}, - {connector_type, Type} - ], - ActionParams = [ - {action_config, ActionConfig}, - {action_name, ActionName}, - {action_type, Type} - ], - {ok, {{_, 201, _}, _, #{}}} = - emqx_bridge_v2_testlib:create_connector_api(ConnectorParams), - - {ok, {{_, 201, _}, _, #{}}} = - emqx_bridge_v2_testlib:create_action_api(ActionParams), - - ?assertMatch( - {ok, - {{_, 200, _}, _, #{ - <<"status_reason">> := - << - "Either the Kafka topic must be fixed (not a template)," - " or at least one pre-defined topic must be set." - >>, - <<"status">> := <<"disconnected">>, - <<"node_status">> := [#{<<"status">> := <<"disconnected">>}] - }}}, - emqx_bridge_v2_testlib:get_bridge_api(Type, ActionName) - ), - - ok - end, - [] - ), - ok. diff --git a/rel/i18n/emqx_bridge_azure_event_hub.hocon b/rel/i18n/emqx_bridge_azure_event_hub.hocon index e683bc9e9..7e37d2e4c 100644 --- a/rel/i18n/emqx_bridge_azure_event_hub.hocon +++ b/rel/i18n/emqx_bridge_azure_event_hub.hocon @@ -350,14 +350,4 @@ Setting this to a value which is greater than the total number of partitions in partitions_limit.label: """Max Partitions""" -producer_pre_configured_topics.label: -"""Pre-configured Event Hubs""" -producer_pre_configured_topics.desc: -"""A list of pre-configured event hubs to be used when using templates to define outgoing topics. If the topic template fails to resolve to a value due to missing data in the incoming message, or if it resolves to a topic that is not contained in this list, then publishing will fail.""" - -pre_configured_topic.label: -"""Event Hubs Name""" -pre_configured_topic.desc: -"""Event Hubs name""" - } diff --git a/rel/i18n/emqx_bridge_confluent_producer.hocon b/rel/i18n/emqx_bridge_confluent_producer.hocon index 81c2c0a89..38623502e 100644 --- a/rel/i18n/emqx_bridge_confluent_producer.hocon +++ b/rel/i18n/emqx_bridge_confluent_producer.hocon @@ -350,14 +350,4 @@ server_name_indication.desc: server_name_indication.label: """SNI""" -producer_pre_configured_topics.label: -"""Pre-configured Topics""" -producer_pre_configured_topics.desc: -"""A list of pre-configured topics to be used when using templates to define outgoing topics. If the topic template fails to resolve to a value due to missing data in the incoming message, or if it resolves to a topic that is not contained in this list, then publishing will fail.""" - -pre_configured_topic.label: -"""Kafka Topic Name""" -pre_configured_topic.desc: -"""Kafka topic name""" - } diff --git a/rel/i18n/emqx_bridge_kafka.hocon b/rel/i18n/emqx_bridge_kafka.hocon index 59896cc22..a066d30fc 100644 --- a/rel/i18n/emqx_bridge_kafka.hocon +++ b/rel/i18n/emqx_bridge_kafka.hocon @@ -446,14 +446,4 @@ server_name_indication.desc: server_name_indication.label: """SNI""" -producer_pre_configured_topics.label: -"""Pre-configured Topics""" -producer_pre_configured_topics.desc: -"""A list of pre-configured topics to be used when using templates to define outgoing topics. If the topic template fails to resolve to a value due to missing data in the incoming message, or if it resolves to a topic that is not contained in this list, then publishing will fail.""" - -pre_configured_topic.label: -"""Kafka Topic Name""" -pre_configured_topic.desc: -"""Kafka topic name""" - } From 1d56ac6e5e87c21a3986f79c061f752d95f0ea48 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 26 Jul 2024 14:26:21 -0300 Subject: [PATCH 4/5] refactor: change topic schema type --- apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 8f72523b1..254e84036 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -391,7 +391,7 @@ fields(v1_producer_kafka_opts) -> ); fields(producer_kafka_opts) -> [ - {topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})}, + {topic, mk(emqx_schema:template(), #{required => true, desc => ?DESC(kafka_topic)})}, {message, mk(ref(kafka_message), #{required => false, desc => ?DESC(kafka_message)})}, {max_batch_bytes, mk(emqx_schema:bytesize(), #{default => <<"896KB">>, desc => ?DESC(max_batch_bytes)})}, From 6786c9b51738f30ffd85ed73145ded9913bf1cd6 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 29 Jul 2024 09:45:52 -0300 Subject: [PATCH 5/5] refactor: improve descriptions and identifiers Co-authored-by: zmstone --- .../src/emqx_bridge_kafka_impl_producer.erl | 2 +- changes/ee/feat-13452.en.md | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index b358cd42b..fb7fce63c 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -839,7 +839,7 @@ handle_telemetry_event(_EventId, _Metrics, _MetaData, _HandlerConfig) -> %% metrics. -spec telemetry_handler_id(action_resource_id()) -> binary(). telemetry_handler_id(ActionResId) -> - <<"emqx-bridge-kafka-producer-", ActionResId/binary>>. + ActionResId. uninstall_telemetry_handlers(TelemetryId) -> telemetry:detach(TelemetryId). diff --git a/changes/ee/feat-13452.en.md b/changes/ee/feat-13452.en.md index 95dae8d32..7b2427329 100644 --- a/changes/ee/feat-13452.en.md +++ b/changes/ee/feat-13452.en.md @@ -1 +1,5 @@ -Added to possibility to configure a list of predefined Kafka topics to Kafka producer actions, and also to use templates to define the destination Kafka topic. +Kafka producer action's `topic` config now supports templates. + +The topics must be already created in Kafka. If a message is rendered towards a non-existing topic in Kafka (given Kafka disabled topic auto-creation), the message will fail with an unrecoverable error. Also, if a message does not contain enough information to render to the configured template (e.g.: the template is `t-${t}` and the message context does not define `t`), this message will also fail with an unrecoverable error. + +This same feature is also available for Azure Event Hubs and Confluent Platform producer integrations.