diff --git a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl index 661b8819c..0136ec568 100644 --- a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl +++ b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl @@ -391,3 +391,37 @@ t_multiple_actions_sharing_topic(Config) -> ] ), ok. + +t_pre_configured_topics(Config) -> + ActionConfig0 = ?config(action_config, Config), + ActionConfig = + emqx_utils_maps:deep_merge( + ActionConfig0, + #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} + ), + ok = emqx_bridge_v2_kafka_producer_SUITE:t_pre_configured_topics( + [ + {type, ?BRIDGE_TYPE_BIN}, + {connector_name, ?config(connector_name, Config)}, + {connector_config, ?config(connector_config, Config)}, + {action_config, ActionConfig} + ] + ), + ok. + +t_templated_topic_and_no_pre_configured_topics(Config) -> + ActionConfig0 = ?config(action_config, Config), + ActionConfig = + emqx_utils_maps:deep_merge( + ActionConfig0, + #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} + ), + ok = emqx_bridge_v2_kafka_producer_SUITE:t_templated_topic_and_no_pre_configured_topics( + [ + {type, ?BRIDGE_TYPE_BIN}, + {connector_name, ?config(connector_name, Config)}, + {connector_config, ?config(connector_config, Config)}, + {action_config, ActionConfig} + ] + ), + ok. diff --git a/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl b/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl index 0b3a22a99..de92b9327 100644 --- a/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl +++ b/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl @@ -400,3 +400,37 @@ t_multiple_actions_sharing_topic(Config) -> ] ), ok. + +t_pre_configured_topics(Config) -> + ActionConfig0 = ?config(action_config, Config), + ActionConfig = + emqx_utils_maps:deep_merge( + ActionConfig0, + #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} + ), + ok = emqx_bridge_v2_kafka_producer_SUITE:t_pre_configured_topics( + [ + {type, ?ACTION_TYPE_BIN}, + {connector_name, ?config(connector_name, Config)}, + {connector_config, ?config(connector_config, Config)}, + {action_config, ActionConfig} + ] + ), + ok. + +t_templated_topic_and_no_pre_configured_topics(Config) -> + ActionConfig0 = ?config(action_config, Config), + ActionConfig = + emqx_utils_maps:deep_merge( + ActionConfig0, + #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} + ), + ok = emqx_bridge_v2_kafka_producer_SUITE:t_templated_topic_and_no_pre_configured_topics( + [ + {type, ?ACTION_TYPE_BIN}, + {connector_name, ?config(connector_name, Config)}, + {connector_config, ?config(connector_config, Config)}, + {action_config, ActionConfig} + ] + ), + ok. diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 83bc33266..9a2fa91cf 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -295,6 +295,7 @@ fields("config_producer") -> fields("config_consumer") -> fields(kafka_consumer); fields(kafka_producer) -> + %% Schema used by bridges V1. connector_config_fields() ++ producer_opts(v1); fields(kafka_producer_action) -> [ @@ -306,6 +307,10 @@ fields(kafka_producer_action) -> {tags, emqx_schema:tags_schema()}, {description, emqx_schema:description_schema()} ] ++ producer_opts(action); +fields(pre_configured_topic) -> + [ + {topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})} + ]; fields(kafka_consumer) -> connector_config_fields() ++ fields(consumer_opts); fields(ssl_client_opts) -> @@ -364,9 +369,41 @@ fields(socket_opts) -> validator => fun emqx_schema:validate_tcp_keepalive/1 })} ]; +fields(v1_producer_kafka_opts) -> + OldSchemaFields = + [ + topic, + message, + max_batch_bytes, + compression, + partition_strategy, + required_acks, + kafka_headers, + kafka_ext_headers, + kafka_header_value_encode_mode, + partition_count_refresh_interval, + partitions_limit, + max_inflight, + buffer, + query_mode, + sync_query_timeout + ], + Fields = fields(producer_kafka_opts), + lists:filter( + fun({K, _V}) -> lists:member(K, OldSchemaFields) end, + Fields + ); fields(producer_kafka_opts) -> [ {topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})}, + {pre_configured_topics, + mk( + hoconsc:array(ref(pre_configured_topic)), + #{ + default => [], + desc => ?DESC("producer_pre_configured_topics") + } + )}, {message, mk(ref(kafka_message), #{required => false, desc => ?DESC(kafka_message)})}, {max_batch_bytes, mk(emqx_schema:bytesize(), #{default => <<"896KB">>, desc => ?DESC(max_batch_bytes)})}, @@ -675,15 +712,15 @@ resource_opts() -> %% However we need to keep it backward compatible for generated schema json (version 0.1.0) %% since schema is data for the 'schemas' API. parameters_field(ActionOrBridgeV1) -> - {Name, Alias} = + {Name, Alias, Ref} = case ActionOrBridgeV1 of v1 -> - {kafka, parameters}; + {kafka, parameters, v1_producer_kafka_opts}; action -> - {parameters, kafka} + {parameters, kafka, producer_kafka_opts} end, {Name, - mk(ref(producer_kafka_opts), #{ + mk(ref(Ref), #{ required => true, aliases => [Alias], desc => ?DESC(producer_kafka_opts), diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 6d88a329e..80de98402 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -122,8 +122,8 @@ on_add_channel( {ok, NewState}. create_producers_for_bridge_v2( - InstId, - BridgeV2Id, + ConnResId, + ActionResId, ClientId, #{ bridge_type := BridgeType, @@ -132,33 +132,57 @@ create_producers_for_bridge_v2( ) -> #{ message := MessageTemplate, - topic := KafkaTopic, + pre_configured_topics := PreConfiguredTopics0, + topic := KafkaTopic0, sync_query_timeout := SyncQueryTimeout } = KafkaConfig, + TopicTemplate = {TopicType, KafkaTopic} = maybe_preproc_topic(KafkaTopic0), + PreConfiguredTopics = [T || #{topic := T} <- PreConfiguredTopics0], + KafkaTopics0 = + case TopicType of + fixed -> + [KafkaTopic | PreConfiguredTopics]; + dynamic -> + PreConfiguredTopics + end, + case KafkaTopics0 of + [] -> + throw(<< + "Either the Kafka topic must be fixed (not a template)," + " or at least one pre-defined topic must be set." + >>); + _ -> + ok + end, + KafkaTopics = lists:map(fun bin/1, KafkaTopics0), KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)), KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])), KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none), MaxPartitions = maps:get(partitions_limit, KafkaConfig, all_partitions), - #{name := BridgeName} = emqx_bridge_v2:parse_id(BridgeV2Id), - IsDryRun = emqx_resource:is_dry_run(BridgeV2Id), - ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions), + #{name := BridgeName} = emqx_bridge_v2:parse_id(ActionResId), + IsDryRun = emqx_resource:is_dry_run(ActionResId), + [AKafkaTopic | _] = KafkaTopics, + ok = check_topic_and_leader_connections(ActionResId, ClientId, AKafkaTopic, MaxPartitions), WolffProducerConfig = producers_config( - BridgeType, BridgeName, KafkaConfig, IsDryRun, BridgeV2Id + BridgeType, BridgeName, KafkaConfig, IsDryRun, ActionResId ), - case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of + case wolff:ensure_supervised_dynamic_producers(ClientId, WolffProducerConfig) of {ok, Producers} -> - ok = emqx_resource:allocate_resource(InstId, {?kafka_producers, BridgeV2Id}, Producers), ok = emqx_resource:allocate_resource( - InstId, {?kafka_telemetry_id, BridgeV2Id}, BridgeV2Id + ConnResId, {?kafka_producers, ActionResId}, Producers ), - _ = maybe_install_wolff_telemetry_handlers(BridgeV2Id), + ok = emqx_resource:allocate_resource( + ConnResId, {?kafka_telemetry_id, ActionResId}, ActionResId + ), + _ = maybe_install_wolff_telemetry_handlers(ActionResId), {ok, #{ message_template => compile_message_template(MessageTemplate), kafka_client_id => ClientId, - kafka_topic => KafkaTopic, + topic_template => TopicTemplate, + pre_configured_topics => KafkaTopics, producers => Producers, - resource_id => BridgeV2Id, - connector_resource_id => InstId, + resource_id => ActionResId, + connector_resource_id => ConnResId, sync_query_timeout => SyncQueryTimeout, kafka_config => KafkaConfig, headers_tokens => KafkaHeadersTokens, @@ -169,7 +193,7 @@ create_producers_for_bridge_v2( {error, Reason2} -> ?SLOG(error, #{ msg => "failed_to_start_kafka_producer", - instance_id => InstId, + instance_id => ConnResId, kafka_client_id => ClientId, kafka_topic => KafkaTopic, reason => Reason2 @@ -264,7 +288,9 @@ remove_producers_for_bridge_v2( ClientId = maps:get(?kafka_client_id, AllocatedResources, no_client_id), maps:foreach( fun - ({?kafka_producers, BridgeV2IdCheck}, Producers) when BridgeV2IdCheck =:= BridgeV2Id -> + ({?kafka_producers, BridgeV2IdCheck}, Producers) when + BridgeV2IdCheck =:= BridgeV2Id + -> deallocate_producers(ClientId, Producers); ({?kafka_telemetry_id, BridgeV2IdCheck}, TelemetryId) when BridgeV2IdCheck =:= BridgeV2Id @@ -297,8 +323,10 @@ on_query( #{installed_bridge_v2s := BridgeV2Configs} = _ConnectorState ) -> #{ - message_template := Template, + message_template := MessageTemplate, + topic_template := TopicTemplate, producers := Producers, + pre_configured_topics := PreConfiguredTopics, sync_query_timeout := SyncTimeout, headers_tokens := KafkaHeadersTokens, ext_headers_tokens := KafkaExtHeadersTokens, @@ -310,7 +338,14 @@ on_query( headers_val_encode_mode => KafkaHeadersValEncodeMode }, try - KafkaMessage = render_message(Template, KafkaHeaders, Message), + KafkaTopic = render_topic(TopicTemplate, Message), + case lists:member(KafkaTopic, PreConfiguredTopics) of + false -> + throw({unknown_topic, KafkaTopic}); + true -> + ok + end, + KafkaMessage = render_message(MessageTemplate, KafkaHeaders, Message), ?tp( emqx_bridge_kafka_impl_producer_sync_query, #{headers_config => KafkaHeaders, instance_id => InstId} @@ -318,9 +353,15 @@ on_query( emqx_trace:rendered_action_template(MessageTag, #{ message => KafkaMessage }), - do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) + do_send_msg(sync, KafkaTopic, KafkaMessage, Producers, SyncTimeout) catch - error:{invalid_partition_count, Count, _Partitioner} -> + throw:bad_topic -> + ?tp("kafka_producer_failed_to_render_topic", #{}), + {error, {unrecoverable_error, failed_to_render_topic}}; + throw:{unknown_topic, Topic} -> + ?tp("kafka_producer_resolved_to_unknown_topic", #{}), + {error, {unrecoverable_error, {resolved_to_unknown_topic, Topic}}}; + throw:#{cause := invalid_partition_count, count := Count} -> ?tp("kafka_producer_invalid_partition_count", #{ action_id => MessageTag, query_mode => sync @@ -365,7 +406,9 @@ on_query_async( ) -> #{ message_template := Template, + topic_template := TopicTemplate, producers := Producers, + pre_configured_topics := PreConfiguredTopics, headers_tokens := KafkaHeadersTokens, ext_headers_tokens := KafkaExtHeadersTokens, headers_val_encode_mode := KafkaHeadersValEncodeMode @@ -376,6 +419,13 @@ on_query_async( headers_val_encode_mode => KafkaHeadersValEncodeMode }, try + KafkaTopic = render_topic(TopicTemplate, Message), + case lists:member(KafkaTopic, PreConfiguredTopics) of + false -> + throw({unknown_topic, KafkaTopic}); + true -> + ok + end, KafkaMessage = render_message(Template, KafkaHeaders, Message), ?tp( emqx_bridge_kafka_impl_producer_async_query, @@ -384,9 +434,15 @@ on_query_async( emqx_trace:rendered_action_template(MessageTag, #{ message => KafkaMessage }), - do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) + do_send_msg(async, KafkaTopic, KafkaMessage, Producers, AsyncReplyFn) catch - error:{invalid_partition_count, Count, _Partitioner} -> + throw:bad_topic -> + ?tp("kafka_producer_failed_to_render_topic", #{}), + {error, {unrecoverable_error, failed_to_render_topic}}; + throw:{unknown_topic, Topic} -> + ?tp("kafka_producer_resolved_to_unknown_topic", #{}), + {error, {unrecoverable_error, {resolved_to_unknown_topic, Topic}}}; + throw:#{cause := invalid_partition_count, count := Count} -> ?tp("kafka_producer_invalid_partition_count", #{ action_id => MessageTag, query_mode => async @@ -424,9 +480,28 @@ compile_message_template(T) -> timestamp => preproc_tmpl(TimestampTemplate) }. +maybe_preproc_topic(Topic) -> + Template = emqx_template:parse(Topic), + case emqx_template:placeholders(Template) of + [] -> + {fixed, bin(Topic)}; + [_ | _] -> + {dynamic, Template} + end. + preproc_tmpl(Tmpl) -> emqx_placeholder:preproc_tmpl(Tmpl). +render_topic({fixed, KafkaTopic}, _Message) -> + KafkaTopic; +render_topic({dynamic, Template}, Message) -> + try + iolist_to_binary(emqx_template:render_strict(Template, Message)) + catch + error:_Errors -> + throw(bad_topic) + end. + render_message( #{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate}, #{ @@ -468,9 +543,11 @@ render_timestamp(Template, Message) -> erlang:system_time(millisecond) end. -do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) -> +do_send_msg(sync, KafkaTopic, KafkaMessage, Producers, SyncTimeout) -> try - {_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], SyncTimeout), + {_Partition, _Offset} = wolff:send_sync2( + Producers, KafkaTopic, [KafkaMessage], SyncTimeout + ), ok catch error:{producer_down, _} = Reason -> @@ -478,7 +555,7 @@ do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) -> error:timeout -> {error, timeout} end; -do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) -> +do_send_msg(async, KafkaTopic, KafkaMessage, Producers, AsyncReplyFn) -> %% * Must be a batch because wolff:send and wolff:send_sync are batch APIs %% * Must be a single element batch because wolff books calls, but not batch sizes %% for counters and gauges. @@ -486,7 +563,9 @@ do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) -> %% The retuned information is discarded here. %% If the producer process is down when sending, this function would %% raise an error exception which is to be caught by the caller of this callback - {_Partition, Pid} = wolff:send(Producers, Batch, {fun ?MODULE:on_kafka_ack/3, [AsyncReplyFn]}), + {_Partition, Pid} = wolff:send2( + Producers, KafkaTopic, Batch, {fun ?MODULE:on_kafka_ack/3, [AsyncReplyFn]} + ), %% this Pid is so far never used because Kafka producer is by-passing the buffer worker {ok, Pid}. @@ -527,20 +606,24 @@ on_get_status( end. on_get_channel_status( - _ResId, - ChannelId, + _ConnResId, + ActionResId, #{ client_id := ClientId, installed_bridge_v2s := Channels - } = _State + } = _ConnState ) -> %% Note: we must avoid returning `?status_disconnected' here. Returning %% `?status_disconnected' will make resource manager try to restart the producers / %% connector, thus potentially dropping data held in wolff producer's replayq. The %% only exception is if the topic does not exist ("unhealthy target"). - #{kafka_topic := KafkaTopic, partitions_limit := MaxPartitions} = maps:get(ChannelId, Channels), + #{ + pre_configured_topics := PreConfiguredTopics, + partitions_limit := MaxPartitions + } = maps:get(ActionResId, Channels), + [KafkaTopic | _] = PreConfiguredTopics, try - ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions), + ok = check_topic_and_leader_connections(ActionResId, ClientId, KafkaTopic, MaxPartitions), ?status_connected catch throw:{unhealthy_target, Msg} -> @@ -549,11 +632,11 @@ on_get_channel_status( {?status_connecting, {K, E}} end. -check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions) -> +check_topic_and_leader_connections(ActionResId, ClientId, KafkaTopic, MaxPartitions) -> case wolff_client_sup:find_client(ClientId) of {ok, Pid} -> ok = check_topic_status(ClientId, Pid, KafkaTopic), - ok = check_if_healthy_leaders(ClientId, Pid, KafkaTopic, MaxPartitions); + ok = check_if_healthy_leaders(ActionResId, ClientId, Pid, KafkaTopic, MaxPartitions); {error, #{reason := no_such_client}} -> throw(#{ reason => cannot_find_kafka_client, @@ -591,8 +674,10 @@ error_summary(Map, [Error]) -> error_summary(Map, [Error | More]) -> Map#{first_error => Error, total_errors => length(More) + 1}. -check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic, MaxPartitions) when is_pid(ClientPid) -> - case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of +check_if_healthy_leaders(ActionResId, ClientId, ClientPid, KafkaTopic, MaxPartitions) when + is_pid(ClientPid) +-> + case wolff_client:get_leader_connections(ClientPid, ActionResId, KafkaTopic, MaxPartitions) of {ok, Leaders} -> %% Kafka is considered healthy as long as any of the partition leader is reachable. case lists:partition(fun({_Partition, Pid}) -> is_alive(Pid) end, Leaders) of @@ -654,7 +739,7 @@ ssl(#{enable := true} = SSL) -> ssl(_) -> false. -producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) -> +producers_config(BridgeType, BridgeName, Input, IsDryRun, ActionResId) -> #{ max_batch_bytes := MaxBatchBytes, compression := Compression, @@ -696,8 +781,8 @@ producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) -> max_batch_bytes => MaxBatchBytes, max_send_ahead => MaxInflight - 1, compression => Compression, - alias => BridgeV2Id, - telemetry_meta_data => #{bridge_id => BridgeV2Id}, + group => ActionResId, + telemetry_meta_data => #{bridge_id => ActionResId}, max_partitions => MaxPartitions }. @@ -773,20 +858,19 @@ handle_telemetry_event(_EventId, _Metrics, _MetaData, _HandlerConfig) -> %% Note: don't use the instance/manager ID, as that changes everytime %% the bridge is recreated, and will lead to multiplication of %% metrics. --spec telemetry_handler_id(resource_id()) -> binary(). -telemetry_handler_id(ResourceID) -> - <<"emqx-bridge-kafka-producer-", ResourceID/binary>>. +-spec telemetry_handler_id(action_resource_id()) -> binary(). +telemetry_handler_id(ActionResId) -> + <<"emqx-bridge-kafka-producer-", ActionResId/binary>>. -uninstall_telemetry_handlers(ResourceID) -> - HandlerID = telemetry_handler_id(ResourceID), - telemetry:detach(HandlerID). +uninstall_telemetry_handlers(TelemetryId) -> + telemetry:detach(TelemetryId). -maybe_install_wolff_telemetry_handlers(ResourceID) -> +maybe_install_wolff_telemetry_handlers(TelemetryId) -> %% Attach event handlers for Kafka telemetry events. If a handler with the %% handler id already exists, the attach_many function does nothing telemetry:attach_many( %% unique handler id - telemetry_handler_id(ResourceID), + telemetry_handler_id(TelemetryId), [ [wolff, dropped_queue_full], [wolff, queuing], @@ -798,7 +882,7 @@ maybe_install_wolff_telemetry_handlers(ResourceID) -> %% wolff producers; otherwise, multiple kafka producer bridges %% will install multiple handlers to the same wolff events, %% multiplying the metric counts... - #{bridge_id => ResourceID} + #{bridge_id => TelemetryId} ). preproc_kafka_headers(HeadersTmpl) when HeadersTmpl =:= <<>>; HeadersTmpl =:= undefined -> diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_producer_action_info.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_producer_action_info.erl index d97e68ba6..b9e13e717 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_producer_action_info.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_producer_action_info.erl @@ -26,7 +26,12 @@ schema_module() -> emqx_bridge_kafka. connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> BridgeV1Config1 = maps:remove(<<"connector">>, ActionConfig), BridgeV1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config1), - emqx_utils_maps:rename(<<"parameters">>, <<"kafka">>, BridgeV1Config2). + BridgeV1Config = emqx_utils_maps:rename(<<"parameters">>, <<"kafka">>, BridgeV1Config2), + maps:update_with( + <<"kafka">>, + fun(Params) -> maps:with(v1_parameters(), Params) end, + BridgeV1Config + ). bridge_v1_config_to_action_config(BridgeV1Conf0 = #{<<"producer">> := _}, ConnectorName) -> %% Ancient v1 config, when `kafka' key was wrapped by `producer' @@ -51,6 +56,12 @@ bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) -> %% Internal helper functions %%------------------------------------------------------------------------------------------ +v1_parameters() -> + [ + to_bin(K) + || {K, _} <- emqx_bridge_kafka:fields(v1_producer_kafka_opts) + ]. + producer_action_field_keys() -> [ to_bin(K) diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index c26f5e94e..6246faaf1 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -23,6 +23,7 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("brod/include/brod.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). +-include_lib("emqx/include/asserts.hrl"). -import(emqx_common_test_helpers, [on_exit/1]). @@ -165,6 +166,9 @@ send_message(Type, ActionName) -> resolve_kafka_offset() -> KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(), + resolve_kafka_offset(KafkaTopic). + +resolve_kafka_offset(KafkaTopic) -> Partition = 0, Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(), {ok, Offset0} = emqx_bridge_kafka_impl_producer_SUITE:resolve_kafka_offset( @@ -174,11 +178,32 @@ resolve_kafka_offset() -> check_kafka_message_payload(Offset, ExpectedPayload) -> KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(), + check_kafka_message_payload(KafkaTopic, Offset, ExpectedPayload). + +check_kafka_message_payload(KafkaTopic, Offset, ExpectedPayload) -> Partition = 0, Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(), {ok, {_, [KafkaMsg0]}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset), ?assertMatch(#kafka_message{value = ExpectedPayload}, KafkaMsg0). +ensure_kafka_topic(KafkaTopic) -> + TopicConfigs = [ + #{ + name => KafkaTopic, + num_partitions => 1, + replication_factor => 1, + assignments => [], + configs => [] + } + ], + RequestConfig = #{timeout => 5_000}, + ConnConfig = #{}, + Endpoints = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(), + case brod:create_topics(Endpoints, TopicConfigs, RequestConfig, ConnConfig) of + ok -> ok; + {error, topic_already_exists} -> ok + end. + action_config(ConnectorName) -> action_config(ConnectorName, _Overrides = #{}). @@ -728,9 +753,13 @@ t_invalid_partition_count_metrics(Config) -> %% Simulate `invalid_partition_count' emqx_common_test_helpers:with_mock( wolff, - send_sync, - fun(_Producers, _Msgs, _Timeout) -> - error({invalid_partition_count, 0, partitioner}) + send_sync2, + fun(_Producers, _Topic, _Msgs, _Timeout) -> + throw(#{ + cause => invalid_partition_count, + count => 0, + partitioner => partitioner + }) end, fun() -> {{ok, _}, {ok, _}} = @@ -773,9 +802,13 @@ t_invalid_partition_count_metrics(Config) -> %% Simulate `invalid_partition_count' emqx_common_test_helpers:with_mock( wolff, - send, - fun(_Producers, _Msgs, _Timeout) -> - error({invalid_partition_count, 0, partitioner}) + send2, + fun(_Producers, _Topic, _Msgs, _AckCallback) -> + throw(#{ + cause => invalid_partition_count, + count => 0, + partitioner => partitioner + }) end, fun() -> {{ok, _}, {ok, _}} = @@ -881,3 +914,196 @@ t_multiple_actions_sharing_topic(Config) -> end ), ok. + +%% Smoke tests for using a templated topic and a list of pre-configured kafka topics. +t_pre_configured_topics(Config) -> + Type = proplists:get_value(type, Config, ?TYPE), + ConnectorName = proplists:get_value(connector_name, Config, <<"c">>), + ConnectorConfig = proplists:get_value(connector_config, Config, connector_config()), + ActionName = <<"pre_configured_topics">>, + ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)), + PreConfigureTopic1 = <<"pct1">>, + PreConfigureTopic2 = <<"pct2">>, + ensure_kafka_topic(PreConfigureTopic1), + ensure_kafka_topic(PreConfigureTopic2), + ActionConfig = emqx_bridge_v2_testlib:parse_and_check( + action, + Type, + ActionName, + emqx_utils_maps:deep_merge( + ActionConfig1, + #{ + <<"parameters">> => #{ + <<"topic">> => <<"pct${.payload.n}">>, + <<"message">> => #{ + <<"key">> => <<"${.clientid}">>, + <<"value">> => <<"${.payload.p}">> + }, + <<"pre_configured_topics">> => [ + #{<<"topic">> => PreConfigureTopic1}, + #{<<"topic">> => PreConfigureTopic2} + ] + } + } + ) + ), + ?check_trace( + #{timetrap => 7_000}, + begin + ConnectorParams = [ + {connector_config, ConnectorConfig}, + {connector_name, ConnectorName}, + {connector_type, Type} + ], + ActionParams = [ + {action_config, ActionConfig}, + {action_name, ActionName}, + {action_type, Type} + ], + {ok, {{_, 201, _}, _, #{}}} = + emqx_bridge_v2_testlib:create_connector_api(ConnectorParams), + + {ok, {{_, 201, _}, _, #{}}} = + emqx_bridge_v2_testlib:create_action_api(ActionParams), + RuleTopic = <<"pct">>, + {ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http( + Type, + RuleTopic, + [ + {bridge_name, ActionName} + ], + #{ + sql => + <<"select *, json_decode(payload) as payload from \"", RuleTopic/binary, + "\" ">> + } + ), + ?assertStatusAPI(Type, ActionName, <<"connected">>), + + HandlerId = ?FUNCTION_NAME, + TestPid = self(), + telemetry:attach_many( + HandlerId, + emqx_resource_metrics:events(), + fun(EventName, Measurements, Metadata, _Config) -> + Data = #{ + name => EventName, + measurements => Measurements, + metadata => Metadata + }, + TestPid ! {telemetry, Data}, + ok + end, + unused_config + ), + on_exit(fun() -> telemetry:detach(HandlerId) end), + + {ok, C} = emqtt:start_link(#{}), + {ok, _} = emqtt:connect(C), + Payload = fun(Map) -> emqx_utils_json:encode(Map) end, + Offset1 = resolve_kafka_offset(PreConfigureTopic1), + Offset2 = resolve_kafka_offset(PreConfigureTopic2), + {ok, _} = emqtt:publish(C, RuleTopic, Payload(#{n => 1, p => <<"p1">>}), [{qos, 1}]), + {ok, _} = emqtt:publish(C, RuleTopic, Payload(#{n => 2, p => <<"p2">>}), [{qos, 1}]), + + check_kafka_message_payload(PreConfigureTopic1, Offset1, <<"p1">>), + check_kafka_message_payload(PreConfigureTopic2, Offset2, <<"p2">>), + + ActionId = emqx_bridge_v2:id(Type, ActionName), + ?assertEqual(2, emqx_resource_metrics:matched_get(ActionId)), + ?assertEqual(2, emqx_resource_metrics:success_get(ActionId)), + ?assertEqual(0, emqx_resource_metrics:queuing_get(ActionId)), + + ?assertReceive( + {telemetry, #{ + measurements := #{gauge_set := _}, + metadata := #{worker_id := _, resource_id := ActionId} + }} + ), + + %% If there isn't enough information in the context to resolve to a topic, it + %% should be an unrecoverable error. + ?assertMatch( + {_, {ok, _}}, + ?wait_async_action( + emqtt:publish(C, RuleTopic, Payload(#{not_enough => <<"info">>}), [{qos, 1}]), + #{?snk_kind := "kafka_producer_failed_to_render_topic"} + ) + ), + + %% If it's possible to render the topic, but it isn't in the pre-configured + %% list, it should be an unrecoverable error. + ?assertMatch( + {_, {ok, _}}, + ?wait_async_action( + emqtt:publish(C, RuleTopic, Payload(#{n => 99}), [{qos, 1}]), + #{?snk_kind := "kafka_producer_resolved_to_unknown_topic"} + ) + ), + + ok + end, + [] + ), + ok. + +%% Checks that creating an action with templated topic and no pre-configured kafka topics +%% throws. +t_templated_topic_and_no_pre_configured_topics(Config) -> + Type = proplists:get_value(type, Config, ?TYPE), + ConnectorName = proplists:get_value(connector_name, Config, <<"c">>), + ConnectorConfig = proplists:get_value(connector_config, Config, connector_config()), + ActionName = <<"bad_pre_configured_topics">>, + ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)), + ActionConfig = emqx_bridge_v2_testlib:parse_and_check( + action, + Type, + ActionName, + emqx_utils_maps:deep_merge( + ActionConfig1, + #{ + <<"parameters">> => #{ + <<"topic">> => <<"pct${.payload.n}">>, + <<"pre_configured_topics">> => [] + } + } + ) + ), + ?check_trace( + #{timetrap => 7_000}, + begin + ConnectorParams = [ + {connector_config, ConnectorConfig}, + {connector_name, ConnectorName}, + {connector_type, Type} + ], + ActionParams = [ + {action_config, ActionConfig}, + {action_name, ActionName}, + {action_type, Type} + ], + {ok, {{_, 201, _}, _, #{}}} = + emqx_bridge_v2_testlib:create_connector_api(ConnectorParams), + + {ok, {{_, 201, _}, _, #{}}} = + emqx_bridge_v2_testlib:create_action_api(ActionParams), + + ?assertMatch( + {ok, + {{_, 200, _}, _, #{ + <<"status_reason">> := + << + "Either the Kafka topic must be fixed (not a template)," + " or at least one pre-defined topic must be set." + >>, + <<"status">> := <<"disconnected">>, + <<"node_status">> := [#{<<"status">> := <<"disconnected">>}] + }}}, + emqx_bridge_v2_testlib:get_bridge_api(Type, ActionName) + ), + + ok + end, + [] + ), + ok. diff --git a/changes/ee/feat-13452.en.md b/changes/ee/feat-13452.en.md new file mode 100644 index 000000000..95dae8d32 --- /dev/null +++ b/changes/ee/feat-13452.en.md @@ -0,0 +1 @@ +Added to possibility to configure a list of predefined Kafka topics to Kafka producer actions, and also to use templates to define the destination Kafka topic. diff --git a/rel/i18n/emqx_bridge_azure_event_hub.hocon b/rel/i18n/emqx_bridge_azure_event_hub.hocon index 3b96e23e6..e683bc9e9 100644 --- a/rel/i18n/emqx_bridge_azure_event_hub.hocon +++ b/rel/i18n/emqx_bridge_azure_event_hub.hocon @@ -69,7 +69,7 @@ producer_kafka_opts.label: """Azure Event Hubs Producer""" kafka_topic.desc: -"""Event Hubs name""" +"""Event Hubs name. Supports templates (e.g.: `t-${payload.t}`).""" kafka_topic.label: """Event Hubs Name""" @@ -350,4 +350,14 @@ Setting this to a value which is greater than the total number of partitions in partitions_limit.label: """Max Partitions""" +producer_pre_configured_topics.label: +"""Pre-configured Event Hubs""" +producer_pre_configured_topics.desc: +"""A list of pre-configured event hubs to be used when using templates to define outgoing topics. If the topic template fails to resolve to a value due to missing data in the incoming message, or if it resolves to a topic that is not contained in this list, then publishing will fail.""" + +pre_configured_topic.label: +"""Event Hubs Name""" +pre_configured_topic.desc: +"""Event Hubs name""" + } diff --git a/rel/i18n/emqx_bridge_confluent_producer.hocon b/rel/i18n/emqx_bridge_confluent_producer.hocon index 748373691..81c2c0a89 100644 --- a/rel/i18n/emqx_bridge_confluent_producer.hocon +++ b/rel/i18n/emqx_bridge_confluent_producer.hocon @@ -69,10 +69,10 @@ producer_kafka_opts.label: """Confluent Producer""" kafka_topic.desc: -"""Event Hub name""" +"""Kafka topic name. Supports templates (e.g.: `t-${payload.t}`).""" kafka_topic.label: -"""Event Hub Name""" +"""Kafka Topic Name""" kafka_message_timestamp.desc: """Which timestamp to use. The timestamp is expected to be a millisecond precision Unix epoch which can be in string format, e.g. 1661326462115 or '1661326462115'. When the desired data field for this template is not found, or if the found data is not a valid integer, the current system timestamp will be used.""" @@ -350,4 +350,14 @@ server_name_indication.desc: server_name_indication.label: """SNI""" +producer_pre_configured_topics.label: +"""Pre-configured Topics""" +producer_pre_configured_topics.desc: +"""A list of pre-configured topics to be used when using templates to define outgoing topics. If the topic template fails to resolve to a value due to missing data in the incoming message, or if it resolves to a topic that is not contained in this list, then publishing will fail.""" + +pre_configured_topic.label: +"""Kafka Topic Name""" +pre_configured_topic.desc: +"""Kafka topic name""" + } diff --git a/rel/i18n/emqx_bridge_kafka.hocon b/rel/i18n/emqx_bridge_kafka.hocon index 6e0074ddd..59896cc22 100644 --- a/rel/i18n/emqx_bridge_kafka.hocon +++ b/rel/i18n/emqx_bridge_kafka.hocon @@ -81,7 +81,7 @@ producer_kafka_opts.label: """Kafka Producer""" kafka_topic.desc: -"""Kafka topic name""" +"""Kafka topic name. Supports templates (e.g.: `t-${payload.t}`).""" kafka_topic.label: """Kafka Topic Name""" @@ -446,5 +446,14 @@ server_name_indication.desc: server_name_indication.label: """SNI""" +producer_pre_configured_topics.label: +"""Pre-configured Topics""" +producer_pre_configured_topics.desc: +"""A list of pre-configured topics to be used when using templates to define outgoing topics. If the topic template fails to resolve to a value due to missing data in the incoming message, or if it resolves to a topic that is not contained in this list, then publishing will fail.""" + +pre_configured_topic.label: +"""Kafka Topic Name""" +pre_configured_topic.desc: +"""Kafka topic name""" }