diff --git a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl index 0136ec568..f2a06cf65 100644 --- a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl +++ b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl @@ -382,46 +382,31 @@ t_multiple_actions_sharing_topic(Config) -> ActionConfig0, #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} ), - ok = emqx_bridge_v2_kafka_producer_SUITE:t_multiple_actions_sharing_topic( - [ - {type, ?BRIDGE_TYPE_BIN}, - {connector_name, ?config(connector_name, Config)}, - {connector_config, ?config(connector_config, Config)}, - {action_config, ActionConfig} - ] - ), + ok = + emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME( + [ + {type, ?BRIDGE_TYPE_BIN}, + {connector_name, ?config(connector_name, Config)}, + {connector_config, ?config(connector_config, Config)}, + {action_config, ActionConfig} + ] + ), ok. -t_pre_configured_topics(Config) -> +t_dynamic_topics(Config) -> ActionConfig0 = ?config(action_config, Config), ActionConfig = emqx_utils_maps:deep_merge( ActionConfig0, #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} ), - ok = emqx_bridge_v2_kafka_producer_SUITE:t_pre_configured_topics( - [ - {type, ?BRIDGE_TYPE_BIN}, - {connector_name, ?config(connector_name, Config)}, - {connector_config, ?config(connector_config, Config)}, - {action_config, ActionConfig} - ] - ), - ok. - -t_templated_topic_and_no_pre_configured_topics(Config) -> - ActionConfig0 = ?config(action_config, Config), - ActionConfig = - emqx_utils_maps:deep_merge( - ActionConfig0, - #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} + ok = + emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME( + [ + {type, ?BRIDGE_TYPE_BIN}, + {connector_name, ?config(connector_name, Config)}, + {connector_config, ?config(connector_config, Config)}, + {action_config, ActionConfig} + ] ), - ok = emqx_bridge_v2_kafka_producer_SUITE:t_templated_topic_and_no_pre_configured_topics( - [ - {type, ?BRIDGE_TYPE_BIN}, - {connector_name, ?config(connector_name, Config)}, - {connector_config, ?config(connector_config, Config)}, - {action_config, ActionConfig} - ] - ), ok. diff --git a/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl b/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl index de92b9327..f10e88463 100644 --- a/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl +++ b/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl @@ -391,46 +391,31 @@ t_multiple_actions_sharing_topic(Config) -> ActionConfig0, #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} ), - ok = emqx_bridge_v2_kafka_producer_SUITE:t_multiple_actions_sharing_topic( - [ - {type, ?ACTION_TYPE_BIN}, - {connector_name, ?config(connector_name, Config)}, - {connector_config, ?config(connector_config, Config)}, - {action_config, ActionConfig} - ] - ), + ok = + emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME( + [ + {type, ?ACTION_TYPE_BIN}, + {connector_name, ?config(connector_name, Config)}, + {connector_config, ?config(connector_config, Config)}, + {action_config, ActionConfig} + ] + ), ok. -t_pre_configured_topics(Config) -> +t_dynamic_topics(Config) -> ActionConfig0 = ?config(action_config, Config), ActionConfig = emqx_utils_maps:deep_merge( ActionConfig0, #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} ), - ok = emqx_bridge_v2_kafka_producer_SUITE:t_pre_configured_topics( - [ - {type, ?ACTION_TYPE_BIN}, - {connector_name, ?config(connector_name, Config)}, - {connector_config, ?config(connector_config, Config)}, - {action_config, ActionConfig} - ] - ), - ok. - -t_templated_topic_and_no_pre_configured_topics(Config) -> - ActionConfig0 = ?config(action_config, Config), - ActionConfig = - emqx_utils_maps:deep_merge( - ActionConfig0, - #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} + ok = + emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME( + [ + {type, ?ACTION_TYPE_BIN}, + {connector_name, ?config(connector_name, Config)}, + {connector_config, ?config(connector_config, Config)}, + {action_config, ActionConfig} + ] ), - ok = emqx_bridge_v2_kafka_producer_SUITE:t_templated_topic_and_no_pre_configured_topics( - [ - {type, ?ACTION_TYPE_BIN}, - {connector_name, ?config(connector_name, Config)}, - {connector_config, ?config(connector_config, Config)}, - {action_config, ActionConfig} - ] - ), ok. diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 9a2fa91cf..8f72523b1 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -307,10 +307,6 @@ fields(kafka_producer_action) -> {tags, emqx_schema:tags_schema()}, {description, emqx_schema:description_schema()} ] ++ producer_opts(action); -fields(pre_configured_topic) -> - [ - {topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})} - ]; fields(kafka_consumer) -> connector_config_fields() ++ fields(consumer_opts); fields(ssl_client_opts) -> @@ -396,14 +392,6 @@ fields(v1_producer_kafka_opts) -> fields(producer_kafka_opts) -> [ {topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})}, - {pre_configured_topics, - mk( - hoconsc:array(ref(pre_configured_topic)), - #{ - default => [], - desc => ?DESC("producer_pre_configured_topics") - } - )}, {message, mk(ref(kafka_message), #{required => false, desc => ?DESC(kafka_message)})}, {max_batch_bytes, mk(emqx_schema:bytesize(), #{default => <<"896KB">>, desc => ?DESC(max_batch_bytes)})}, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 80de98402..b358cd42b 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -3,6 +3,8 @@ %%-------------------------------------------------------------------- -module(emqx_bridge_kafka_impl_producer). +-feature(maybe_expr, enable). + -behaviour(emqx_resource). -include_lib("emqx_resource/include/emqx_resource.hrl"). @@ -132,37 +134,22 @@ create_producers_for_bridge_v2( ) -> #{ message := MessageTemplate, - pre_configured_topics := PreConfiguredTopics0, topic := KafkaTopic0, sync_query_timeout := SyncQueryTimeout } = KafkaConfig, - TopicTemplate = {TopicType, KafkaTopic} = maybe_preproc_topic(KafkaTopic0), - PreConfiguredTopics = [T || #{topic := T} <- PreConfiguredTopics0], - KafkaTopics0 = + TopicTemplate = {TopicType, TopicOrTemplate} = maybe_preproc_topic(KafkaTopic0), + MKafkaTopic = case TopicType of - fixed -> - [KafkaTopic | PreConfiguredTopics]; - dynamic -> - PreConfiguredTopics + fixed -> TopicOrTemplate; + dynamic -> dynamic end, - case KafkaTopics0 of - [] -> - throw(<< - "Either the Kafka topic must be fixed (not a template)," - " or at least one pre-defined topic must be set." - >>); - _ -> - ok - end, - KafkaTopics = lists:map(fun bin/1, KafkaTopics0), KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)), KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])), KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none), MaxPartitions = maps:get(partitions_limit, KafkaConfig, all_partitions), #{name := BridgeName} = emqx_bridge_v2:parse_id(ActionResId), IsDryRun = emqx_resource:is_dry_run(ActionResId), - [AKafkaTopic | _] = KafkaTopics, - ok = check_topic_and_leader_connections(ActionResId, ClientId, AKafkaTopic, MaxPartitions), + ok = check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions), WolffProducerConfig = producers_config( BridgeType, BridgeName, KafkaConfig, IsDryRun, ActionResId ), @@ -179,7 +166,7 @@ create_producers_for_bridge_v2( message_template => compile_message_template(MessageTemplate), kafka_client_id => ClientId, topic_template => TopicTemplate, - pre_configured_topics => KafkaTopics, + topic => MKafkaTopic, producers => Producers, resource_id => ActionResId, connector_resource_id => ConnResId, @@ -195,7 +182,7 @@ create_producers_for_bridge_v2( msg => "failed_to_start_kafka_producer", instance_id => ConnResId, kafka_client_id => ClientId, - kafka_topic => KafkaTopic, + kafka_topic => MKafkaTopic, reason => Reason2 }), throw( @@ -326,7 +313,6 @@ on_query( message_template := MessageTemplate, topic_template := TopicTemplate, producers := Producers, - pre_configured_topics := PreConfiguredTopics, sync_query_timeout := SyncTimeout, headers_tokens := KafkaHeadersTokens, ext_headers_tokens := KafkaExtHeadersTokens, @@ -339,12 +325,6 @@ on_query( }, try KafkaTopic = render_topic(TopicTemplate, Message), - case lists:member(KafkaTopic, PreConfiguredTopics) of - false -> - throw({unknown_topic, KafkaTopic}); - true -> - ok - end, KafkaMessage = render_message(MessageTemplate, KafkaHeaders, Message), ?tp( emqx_bridge_kafka_impl_producer_sync_query, @@ -358,7 +338,7 @@ on_query( throw:bad_topic -> ?tp("kafka_producer_failed_to_render_topic", #{}), {error, {unrecoverable_error, failed_to_render_topic}}; - throw:{unknown_topic, Topic} -> + throw:#{cause := unknown_topic_or_partition, topic := Topic} -> ?tp("kafka_producer_resolved_to_unknown_topic", #{}), {error, {unrecoverable_error, {resolved_to_unknown_topic, Topic}}}; throw:#{cause := invalid_partition_count, count := Count} -> @@ -408,7 +388,6 @@ on_query_async( message_template := Template, topic_template := TopicTemplate, producers := Producers, - pre_configured_topics := PreConfiguredTopics, headers_tokens := KafkaHeadersTokens, ext_headers_tokens := KafkaExtHeadersTokens, headers_val_encode_mode := KafkaHeadersValEncodeMode @@ -420,12 +399,6 @@ on_query_async( }, try KafkaTopic = render_topic(TopicTemplate, Message), - case lists:member(KafkaTopic, PreConfiguredTopics) of - false -> - throw({unknown_topic, KafkaTopic}); - true -> - ok - end, KafkaMessage = render_message(Template, KafkaHeaders, Message), ?tp( emqx_bridge_kafka_impl_producer_async_query, @@ -439,7 +412,7 @@ on_query_async( throw:bad_topic -> ?tp("kafka_producer_failed_to_render_topic", #{}), {error, {unrecoverable_error, failed_to_render_topic}}; - throw:{unknown_topic, Topic} -> + throw:#{cause := unknown_topic_or_partition, topic := Topic} -> ?tp("kafka_producer_resolved_to_unknown_topic", #{}), {error, {unrecoverable_error, {resolved_to_unknown_topic, Topic}}}; throw:#{cause := invalid_partition_count, count := Count} -> @@ -618,12 +591,11 @@ on_get_channel_status( %% connector, thus potentially dropping data held in wolff producer's replayq. The %% only exception is if the topic does not exist ("unhealthy target"). #{ - pre_configured_topics := PreConfiguredTopics, + topic := MKafkaTopic, partitions_limit := MaxPartitions } = maps:get(ActionResId, Channels), - [KafkaTopic | _] = PreConfiguredTopics, try - ok = check_topic_and_leader_connections(ActionResId, ClientId, KafkaTopic, MaxPartitions), + ok = check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions), ?status_connected catch throw:{unhealthy_target, Msg} -> @@ -632,22 +604,29 @@ on_get_channel_status( {?status_connecting, {K, E}} end. -check_topic_and_leader_connections(ActionResId, ClientId, KafkaTopic, MaxPartitions) -> +check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions) -> case wolff_client_sup:find_client(ClientId) of {ok, Pid} -> - ok = check_topic_status(ClientId, Pid, KafkaTopic), - ok = check_if_healthy_leaders(ActionResId, ClientId, Pid, KafkaTopic, MaxPartitions); + maybe + true ?= is_binary(MKafkaTopic), + ok = check_topic_status(ClientId, Pid, MKafkaTopic), + ok = check_if_healthy_leaders( + ActionResId, ClientId, Pid, MKafkaTopic, MaxPartitions + ) + else + false -> ok + end; {error, #{reason := no_such_client}} -> throw(#{ reason => cannot_find_kafka_client, kafka_client => ClientId, - kafka_topic => KafkaTopic + kafka_topic => MKafkaTopic }); {error, #{reason := client_supervisor_not_initialized}} -> throw(#{ reason => restarting, kafka_client => ClientId, - kafka_topic => KafkaTopic + kafka_topic => MKafkaTopic }) end. diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index 6246faaf1..08b2723e7 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -915,17 +915,17 @@ t_multiple_actions_sharing_topic(Config) -> ), ok. -%% Smoke tests for using a templated topic and a list of pre-configured kafka topics. -t_pre_configured_topics(Config) -> +%% Smoke tests for using a templated topic and adynamic kafka topics. +t_dynamic_topics(Config) -> Type = proplists:get_value(type, Config, ?TYPE), ConnectorName = proplists:get_value(connector_name, Config, <<"c">>), ConnectorConfig = proplists:get_value(connector_config, Config, connector_config()), - ActionName = <<"pre_configured_topics">>, + ActionName = <<"dynamic_topics">>, ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)), - PreConfigureTopic1 = <<"pct1">>, - PreConfigureTopic2 = <<"pct2">>, - ensure_kafka_topic(PreConfigureTopic1), - ensure_kafka_topic(PreConfigureTopic2), + PreConfiguredTopic1 = <<"pct1">>, + PreConfiguredTopic2 = <<"pct2">>, + ensure_kafka_topic(PreConfiguredTopic1), + ensure_kafka_topic(PreConfiguredTopic2), ActionConfig = emqx_bridge_v2_testlib:parse_and_check( action, Type, @@ -938,11 +938,7 @@ t_pre_configured_topics(Config) -> <<"message">> => #{ <<"key">> => <<"${.clientid}">>, <<"value">> => <<"${.payload.p}">> - }, - <<"pre_configured_topics">> => [ - #{<<"topic">> => PreConfigureTopic1}, - #{<<"topic">> => PreConfigureTopic2} - ] + } } } ) @@ -1001,13 +997,13 @@ t_pre_configured_topics(Config) -> {ok, C} = emqtt:start_link(#{}), {ok, _} = emqtt:connect(C), Payload = fun(Map) -> emqx_utils_json:encode(Map) end, - Offset1 = resolve_kafka_offset(PreConfigureTopic1), - Offset2 = resolve_kafka_offset(PreConfigureTopic2), + Offset1 = resolve_kafka_offset(PreConfiguredTopic1), + Offset2 = resolve_kafka_offset(PreConfiguredTopic2), {ok, _} = emqtt:publish(C, RuleTopic, Payload(#{n => 1, p => <<"p1">>}), [{qos, 1}]), {ok, _} = emqtt:publish(C, RuleTopic, Payload(#{n => 2, p => <<"p2">>}), [{qos, 1}]), - check_kafka_message_payload(PreConfigureTopic1, Offset1, <<"p1">>), - check_kafka_message_payload(PreConfigureTopic2, Offset2, <<"p2">>), + check_kafka_message_payload(PreConfiguredTopic1, Offset1, <<"p1">>), + check_kafka_message_payload(PreConfiguredTopic2, Offset2, <<"p2">>), ActionId = emqx_bridge_v2:id(Type, ActionName), ?assertEqual(2, emqx_resource_metrics:matched_get(ActionId)), @@ -1046,64 +1042,3 @@ t_pre_configured_topics(Config) -> [] ), ok. - -%% Checks that creating an action with templated topic and no pre-configured kafka topics -%% throws. -t_templated_topic_and_no_pre_configured_topics(Config) -> - Type = proplists:get_value(type, Config, ?TYPE), - ConnectorName = proplists:get_value(connector_name, Config, <<"c">>), - ConnectorConfig = proplists:get_value(connector_config, Config, connector_config()), - ActionName = <<"bad_pre_configured_topics">>, - ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)), - ActionConfig = emqx_bridge_v2_testlib:parse_and_check( - action, - Type, - ActionName, - emqx_utils_maps:deep_merge( - ActionConfig1, - #{ - <<"parameters">> => #{ - <<"topic">> => <<"pct${.payload.n}">>, - <<"pre_configured_topics">> => [] - } - } - ) - ), - ?check_trace( - #{timetrap => 7_000}, - begin - ConnectorParams = [ - {connector_config, ConnectorConfig}, - {connector_name, ConnectorName}, - {connector_type, Type} - ], - ActionParams = [ - {action_config, ActionConfig}, - {action_name, ActionName}, - {action_type, Type} - ], - {ok, {{_, 201, _}, _, #{}}} = - emqx_bridge_v2_testlib:create_connector_api(ConnectorParams), - - {ok, {{_, 201, _}, _, #{}}} = - emqx_bridge_v2_testlib:create_action_api(ActionParams), - - ?assertMatch( - {ok, - {{_, 200, _}, _, #{ - <<"status_reason">> := - << - "Either the Kafka topic must be fixed (not a template)," - " or at least one pre-defined topic must be set." - >>, - <<"status">> := <<"disconnected">>, - <<"node_status">> := [#{<<"status">> := <<"disconnected">>}] - }}}, - emqx_bridge_v2_testlib:get_bridge_api(Type, ActionName) - ), - - ok - end, - [] - ), - ok. diff --git a/rel/i18n/emqx_bridge_azure_event_hub.hocon b/rel/i18n/emqx_bridge_azure_event_hub.hocon index e683bc9e9..7e37d2e4c 100644 --- a/rel/i18n/emqx_bridge_azure_event_hub.hocon +++ b/rel/i18n/emqx_bridge_azure_event_hub.hocon @@ -350,14 +350,4 @@ Setting this to a value which is greater than the total number of partitions in partitions_limit.label: """Max Partitions""" -producer_pre_configured_topics.label: -"""Pre-configured Event Hubs""" -producer_pre_configured_topics.desc: -"""A list of pre-configured event hubs to be used when using templates to define outgoing topics. If the topic template fails to resolve to a value due to missing data in the incoming message, or if it resolves to a topic that is not contained in this list, then publishing will fail.""" - -pre_configured_topic.label: -"""Event Hubs Name""" -pre_configured_topic.desc: -"""Event Hubs name""" - } diff --git a/rel/i18n/emqx_bridge_confluent_producer.hocon b/rel/i18n/emqx_bridge_confluent_producer.hocon index 81c2c0a89..38623502e 100644 --- a/rel/i18n/emqx_bridge_confluent_producer.hocon +++ b/rel/i18n/emqx_bridge_confluent_producer.hocon @@ -350,14 +350,4 @@ server_name_indication.desc: server_name_indication.label: """SNI""" -producer_pre_configured_topics.label: -"""Pre-configured Topics""" -producer_pre_configured_topics.desc: -"""A list of pre-configured topics to be used when using templates to define outgoing topics. If the topic template fails to resolve to a value due to missing data in the incoming message, or if it resolves to a topic that is not contained in this list, then publishing will fail.""" - -pre_configured_topic.label: -"""Kafka Topic Name""" -pre_configured_topic.desc: -"""Kafka topic name""" - } diff --git a/rel/i18n/emqx_bridge_kafka.hocon b/rel/i18n/emqx_bridge_kafka.hocon index 59896cc22..a066d30fc 100644 --- a/rel/i18n/emqx_bridge_kafka.hocon +++ b/rel/i18n/emqx_bridge_kafka.hocon @@ -446,14 +446,4 @@ server_name_indication.desc: server_name_indication.label: """SNI""" -producer_pre_configured_topics.label: -"""Pre-configured Topics""" -producer_pre_configured_topics.desc: -"""A list of pre-configured topics to be used when using templates to define outgoing topics. If the topic template fails to resolve to a value due to missing data in the incoming message, or if it resolves to a topic that is not contained in this list, then publishing will fail.""" - -pre_configured_topic.label: -"""Kafka Topic Name""" -pre_configured_topic.desc: -"""Kafka topic name""" - }