From 9900a32850f75319dfdbaffb3acf0ed5b7872d05 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 7 Aug 2023 11:39:05 -0300 Subject: [PATCH] feat(kafka_consumer): add mqtt topic placeholder support Fixes https://emqx.atlassian.net/browse/EMQX-10678 --- .../src/emqx_bridge_kafka.app.src | 2 +- .../src/emqx_bridge_kafka.erl | 2 +- .../src/emqx_bridge_kafka_impl_consumer.erl | 12 ++- .../emqx_bridge_kafka_impl_consumer_SUITE.erl | 93 ++++++++++++++++++- changes/ee/feat-11402.en.md | 1 + 5 files changed, 102 insertions(+), 8 deletions(-) create mode 100644 changes/ee/feat-11402.en.md diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src index 7157a1580..3792409c6 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_kafka, [ {description, "EMQX Enterprise Kafka Bridge"}, - {vsn, "0.1.6"}, + {vsn, "0.1.7"}, {registered, [emqx_bridge_kafka_consumer_sup]}, {applications, [ kernel, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index eeaa7d4b7..c2388f508 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -125,7 +125,7 @@ values(consumer) -> topic_mapping => [ #{ kafka_topic => <<"kafka-topic-1">>, - mqtt_topic => <<"mqtt/topic/1">>, + mqtt_topic => <<"mqtt/topic/${.offset}">>, qos => 1, payload_template => <<"${.}">> }, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index e18bf7e29..b8abb928c 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -69,7 +69,7 @@ topic_mapping := #{ kafka_topic() := #{ payload_template := emqx_placeholder:tmpl_token(), - mqtt_topic => emqx_types:topic(), + mqtt_topic_template => emqx_placeholder:tmpl_token(), qos => emqx_types:qos() } }, @@ -83,7 +83,7 @@ topic_mapping := #{ kafka_topic() := #{ payload_template := emqx_placeholder:tmpl_token(), - mqtt_topic => emqx_types:topic(), + mqtt_topic_template => emqx_placeholder:tmpl_token(), qos => emqx_types:qos() } }, @@ -235,7 +235,7 @@ do_handle_message(Message, State) -> value_encoding_mode := ValueEncodingMode } = State, #{ - mqtt_topic := MQTTTopic, + mqtt_topic_template := MQTTTopicTemplate, qos := MQTTQoS, payload_template := PayloadTemplate } = maps:get(KafkaTopic, TopicMapping), @@ -249,6 +249,7 @@ do_handle_message(Message, State) -> value => encode(Message#kafka_message.value, ValueEncodingMode) }, Payload = render(FullMessage, PayloadTemplate), + MQTTTopic = render(FullMessage, MQTTTopicTemplate), MQTTMessage = emqx_message:make(ResourceId, MQTTQoS, MQTTTopic, Payload), _ = emqx:publish(MQTTMessage), emqx:run_hook(Hookpoint, [FullMessage]), @@ -533,15 +534,16 @@ convert_topic_mapping(TopicMappingList) -> fun(Fields, Acc) -> #{ kafka_topic := KafkaTopic, - mqtt_topic := MQTTTopic, + mqtt_topic := MQTTTopicTemplate0, qos := QoS, payload_template := PayloadTemplate0 } = Fields, PayloadTemplate = emqx_placeholder:preproc_tmpl(PayloadTemplate0), + MQTTTopicTemplate = emqx_placeholder:preproc_tmpl(MQTTTopicTemplate0), Acc#{ KafkaTopic => #{ payload_template => PayloadTemplate, - mqtt_topic => MQTTTopic, + mqtt_topic_template => MQTTTopicTemplate, qos => QoS } } diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index f1f2ce362..2d8355e8e 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -60,6 +60,7 @@ only_once_tests() -> t_node_joins_existing_cluster, t_cluster_node_down, t_multiple_topic_mappings, + t_dynamic_mqtt_topic, t_resource_manager_crash_after_subscriber_started, t_resource_manager_crash_before_subscriber_started ]. @@ -329,6 +330,23 @@ init_per_testcase(t_multiple_topic_mappings = TestCase, Config0) -> ], Config = [{topic_mapping, TopicMapping} | Config0], common_init_per_testcase(TestCase, Config); +init_per_testcase(t_dynamic_mqtt_topic = TestCase, Config0) -> + KafkaTopic = + << + (atom_to_binary(TestCase))/binary, + (integer_to_binary(erlang:unique_integer()))/binary + >>, + TopicMapping = + [ + #{ + kafka_topic => KafkaTopic, + mqtt_topic => <<"${.topic}/${.value}/${.headers.hkey}">>, + qos => 1, + payload_template => <<"${.}">> + } + ], + Config = [{kafka_topic, KafkaTopic}, {topic_mapping, TopicMapping} | Config0], + common_init_per_testcase(TestCase, Config); init_per_testcase(TestCase, Config) -> common_init_per_testcase(TestCase, Config). @@ -336,11 +354,12 @@ common_init_per_testcase(TestCase, Config0) -> ct:timetrap(timer:seconds(60)), delete_all_bridges(), emqx_config:delete_override_conf_files(), - KafkaTopic = + KafkaTopic0 = << (atom_to_binary(TestCase))/binary, (integer_to_binary(erlang:unique_integer()))/binary >>, + KafkaTopic = proplists:get_value(kafka_topic, Config0, KafkaTopic0), KafkaType = ?config(kafka_type, Config0), UniqueNum = integer_to_binary(erlang:unique_integer()), MQTTTopic = proplists:get_value(mqtt_topic, Config0, <<"mqtt/topic/", UniqueNum/binary>>), @@ -1674,6 +1693,78 @@ t_bridge_rule_action_source(Config) -> ), ok. +t_dynamic_mqtt_topic(Config) -> + KafkaTopic = ?config(kafka_topic, Config), + NPartitions = ?config(num_partitions, Config), + ResourceId = resource_id(Config), + Payload = emqx_guid:to_hexstr(emqx_guid:gen()), + MQTTTopic = emqx_topic:join([KafkaTopic, '#']), + ?check_trace( + begin + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + wait_until_subscribers_are_ready(NPartitions, 40_000), + {ok, C} = emqtt:start_link(), + on_exit(fun() -> emqtt:stop(C) end), + {ok, _} = emqtt:connect(C), + {ok, _, [0]} = emqtt:subscribe(C, MQTTTopic), + ct:pal("subscribed to ~p", [MQTTTopic]), + + {ok, SRef0} = snabbkaffe:subscribe( + ?match_event(#{ + ?snk_kind := kafka_consumer_handle_message, ?snk_span := {complete, _} + }), + _NumMsgs = 3, + 20_000 + ), + {_Partition, _OffsetReply} = + publish(Config, [ + %% this will have the last segment defined + #{ + key => <<"mykey">>, + value => Payload, + headers => [{<<"hkey">>, <<"hvalue">>}] + }, + %% this will not + #{ + key => <<"mykey">>, + value => Payload + }, + %% will inject an invalid topic segment + #{ + key => <<"mykey">>, + value => <<"+">> + } + ]), + {ok, _} = snabbkaffe:receive_events(SRef0), + ok + end, + fun(Trace) -> + ?assertMatch([_Enter, _Complete | _], ?of_kind(kafka_consumer_handle_message, Trace)), + %% the message with invalid topic will fail to be published + Published = receive_published(#{n => 2}), + ExpectedMQTTTopic0 = emqx_topic:join([KafkaTopic, Payload, <<"hvalue">>]), + ExpectedMQTTTopic1 = emqx_topic:join([KafkaTopic, Payload, <<>>]), + ?assertMatch( + [ + #{ + topic := ExpectedMQTTTopic0 + }, + #{ + topic := ExpectedMQTTTopic1 + } + ], + Published + ), + ?assertEqual(3, emqx_resource_metrics:received_get(ResourceId)), + ?assertError({timeout, _}, receive_published(#{timeout => 500})), + ok + end + ), + ok. + %% checks that an existing cluster can be configured with a kafka %% consumer bridge and that the consumers will distribute over the two %% nodes. diff --git a/changes/ee/feat-11402.en.md b/changes/ee/feat-11402.en.md new file mode 100644 index 000000000..7d6090b58 --- /dev/null +++ b/changes/ee/feat-11402.en.md @@ -0,0 +1 @@ +Added support for using placeholders to define MQTT Topic in Kafka Consumer bridge topic mappings.