diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf index 636573d07..ed88a1e0d 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf @@ -536,17 +536,33 @@ emqx_ee_bridge_kafka { } consumer_mqtt_payload { desc { - en: "The payload of the MQTT message to be published.\n" - "full_message will encode available Kafka message attributes as a JSON object, including Key, Value, Timestamp and Headers" - "message_value will directly use the Kafka message value as the " - "MQTT message payload." - zh: "要发布的MQTT消息的有效载荷。" - "full_message将把所有可用数据编码为JSON对象,包括 Key,Value,Timestamp 和 Headers。" - "message_value将直接使用 Kafka 消息值作为MQTT消息的 Payload。" + en: "The template for transforming the incoming Kafka message." + " By default, it will use JSON format to serialize all visible" + " inputs from the Kafka message. Such fields are:\n" + "headers: an object containing string key-value pairs.\n" + "key: Kafka message key (uses the chosen key encoding).\n" + "offset: offset for the message.\n" + "topic: Kafka topic.\n" + "ts: message timestamp.\n" + "ts_type: message timestamp type, which is one of" + " create, append or undefined.\n" + "value: Kafka message value (uses the chosen value encoding).\n" + zh: "用于转换传入的Kafka消息的模板。 " + "默认情况下,它将使用JSON格式来序列化所有来自Kafka消息的可见输入。 " + "这样的字段是。" + "headers: 一个包含字符串键值对的对象。\n" + "key: Kafka消息密钥(使用选择的密钥编码)。\n" + "offset: 信息的偏移量。\n" + "topic: 卡夫卡主题。\n" + "ts: 消息的时间戳。\n" + "ts_type: 消息的时间戳类型,它是一个" + " createappendundefined。\n" + "value: Kafka消息值(使用选择的值编码)。\n" + } label { - en: "MQTT Payload" - zh: "MQTT Payload" + en: "MQTT Payload Template" + zh: "MQTT Payload Template" } } consumer_kafka_topic { @@ -603,4 +619,29 @@ emqx_ee_bridge_kafka { zh: "偏移承诺间隔" } } + consumer_topic_mapping { + desc { + en: "Defines the mapping between Kafka topics and MQTT topics. Must contain at least one item." + zh: "定义了Kafka主题和MQTT主题之间的映射。 必须至少包含一个项目。" + } + label { + en: "Topic Mapping" + zh: "主题图" + } + } + consumer_encoding_mode { + desc { + en: "Defines how the key or value from the Kafka message is" + " dealt with before being forwarded via MQTT.\n" + "force_utf8 Uses UTF-8 encoding directly from the original message.\n" + "base64 Uses base-64 encoding on the received key or value." + zh: "定义了在通过MQTT转发之前如何处理Kafka消息的键或值。" + "force_utf8 直接使用原始信息的UTF-8编码。\n" + "base64 对收到的密钥或值使用base-64编码。" + } + label { + en: "Encoding Mode" + zh: "编码模式" + } + } } diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl index 583acc48d..de67a73c6 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl @@ -263,26 +263,44 @@ fields(producer_buffer) -> fields(consumer_opts) -> [ {kafka, - mk(ref(consumer_kafka_opts), #{required => true, desc => ?DESC(consumer_kafka_opts)})}, - {mqtt, mk(ref(consumer_mqtt_opts), #{required => true, desc => ?DESC(consumer_mqtt_opts)})} - ]; -fields(consumer_mqtt_opts) -> - [ - {topic, - mk(binary(), #{ - required => true, - desc => ?DESC(consumer_mqtt_topic) - })}, - {qos, mk(emqx_schema:qos(), #{default => 0, desc => ?DESC(consumer_mqtt_qos)})}, - {payload, + mk(ref(consumer_kafka_opts), #{required => false, desc => ?DESC(consumer_kafka_opts)})}, + {topic_mapping, mk( - enum([full_message, message_value]), - #{default => full_message, desc => ?DESC(consumer_mqtt_payload)} + hoconsc:array(ref(consumer_topic_mapping)), + #{ + required => true, + desc => ?DESC(consumer_topic_mapping), + validator => + fun + ([]) -> + {error, "There must be at least one Kafka-MQTT topic mapping"}; + ([_ | _]) -> + ok + end + } + )}, + {key_encoding_mode, + mk(enum([force_utf8, base64]), #{ + default => force_utf8, desc => ?DESC(consumer_encoding_mode) + })}, + {value_encoding_mode, + mk(enum([force_utf8, base64]), #{ + default => force_utf8, desc => ?DESC(consumer_encoding_mode) + })} + ]; +fields(consumer_topic_mapping) -> + [ + {kafka_topic, mk(binary(), #{required => true, desc => ?DESC(consumer_kafka_topic)})}, + {mqtt_topic, mk(binary(), #{required => true, desc => ?DESC(consumer_mqtt_topic)})}, + {qos, mk(emqx_schema:qos(), #{default => 0, desc => ?DESC(consumer_mqtt_qos)})}, + {payload_template, + mk( + string(), + #{default => <<"${.}">>, desc => ?DESC(consumer_mqtt_payload)} )} ]; fields(consumer_kafka_opts) -> [ - {topic, mk(binary(), #{desc => ?DESC(consumer_kafka_topic)})}, {max_batch_bytes, mk(emqx_schema:bytesize(), #{ default => "896KB", desc => ?DESC(consumer_max_batch_bytes) @@ -330,7 +348,7 @@ struct_names() -> producer_opts, consumer_opts, consumer_kafka_opts, - consumer_mqtt_opts + consumer_topic_mapping ]. %% ------------------------------------------------------------------------------------------------- diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl index 43717dd89..99877ca8e 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl @@ -41,31 +41,59 @@ offset_reset_policy := offset_reset_policy(), topic := binary() }, - mqtt := #{ - topic := emqx_types:topic(), - qos := emqx_types:qos(), - payload := mqtt_payload() - }, + topic_mapping := nonempty_list( + #{ + kafka_topic := kafka_topic(), + mqtt_topic := emqx_types:topic(), + qos := emqx_types:qos(), + payload_template := string() + } + ), ssl := _, any() => term() }. -type subscriber_id() :: emqx_ee_bridge_kafka_consumer_sup:child_id(). +-type kafka_topic() :: brod:topic(). -type state() :: #{ - kafka_topic := binary(), + kafka_topics := nonempty_list(kafka_topic()), subscriber_id := subscriber_id(), kafka_client_id := brod:client_id() }. -type offset_reset_policy() :: reset_to_latest | reset_to_earliest | reset_by_subscriber. --type mqtt_payload() :: full_message | message_value. --type consumer_state() :: #{ - resource_id := resource_id(), - mqtt := #{ - payload := mqtt_payload(), - topic => emqx_types:topic(), - qos => emqx_types:qos() - }, +%% -type mqtt_payload() :: full_message | message_value. +-type encoding_mode() :: force_utf8 | base64. +-type consumer_init_data() :: #{ hookpoint := binary(), - kafka_topic := binary() + key_encoding_mode := encoding_mode(), + resource_id := resource_id(), + topic_mapping := #{ + kafka_topic() := #{ + payload_template := emqx_plugin_libs_rule:tmpl_token(), + mqtt_topic => emqx_types:topic(), + qos => emqx_types:qos() + } + }, + value_encoding_mode := encoding_mode() +}. +-type consumer_state() :: #{ + hookpoint := binary(), + kafka_topic := binary(), + key_encoding_mode := encoding_mode(), + resource_id := resource_id(), + topic_mapping := #{ + kafka_topic() := #{ + payload_template := emqx_plugin_libs_rule:tmpl_token(), + mqtt_topic => emqx_types:topic(), + qos => emqx_types:qos() + } + }, + value_encoding_mode := encoding_mode() +}. +-type subscriber_init_info() :: #{ + topic => brod:topic(), + parition => brod:partition(), + group_id => brod:group_id(), + commit_fun => brod_group_subscriber_v2:commit_fun() }. %%------------------------------------------------------------------------------------- @@ -92,11 +120,10 @@ on_start(InstanceId, Config) -> max_batch_bytes := _, max_rejoin_attempts := _, offset_commit_interval_seconds := _, - offset_reset_policy := _, - topic := _ + offset_reset_policy := _ }, - mqtt := #{topic := _, qos := _, payload := _}, - ssl := SSL + ssl := SSL, + topic_mapping := _ } = Config, BootstrapHosts = emqx_bridge_impl_kafka:hosts(BootstrapHosts0), KafkaType = kafka_consumer, @@ -145,22 +172,19 @@ on_get_status(_InstanceID, State) -> #{ subscriber_id := SubscriberId, kafka_client_id := ClientID, - kafka_topic := KafkaTopic + kafka_topics := KafkaTopics } = State, - case brod:get_partitions_count(ClientID, KafkaTopic) of - {ok, NPartitions} -> - do_get_status(ClientID, KafkaTopic, SubscriberId, NPartitions); - _ -> - disconnected - end. + do_get_status(ClientID, KafkaTopics, SubscriberId). %%------------------------------------------------------------------------------------- %% `brod_group_subscriber' API %%------------------------------------------------------------------------------------- --spec init(_, consumer_state()) -> {ok, consumer_state()}. -init(_GroupData, State) -> - ?tp(kafka_consumer_subscriber_init, #{group_data => _GroupData, state => State}), +-spec init(subscriber_init_info(), consumer_init_data()) -> {ok, consumer_state()}. +init(GroupData, State0) -> + ?tp(kafka_consumer_subscriber_init, #{group_data => GroupData, state => State0}), + #{topic := KafkaTopic} = GroupData, + State = State0#{kafka_topic => KafkaTopic}, {ok, State}. -spec handle_message(#kafka_message{}, consumer_state()) -> {ok, commit, consumer_state()}. @@ -173,33 +197,29 @@ handle_message(Message, State) -> do_handle_message(Message, State) -> #{ - resource_id := ResourceId, hookpoint := Hookpoint, kafka_topic := KafkaTopic, - mqtt := #{ - topic := MQTTTopic, - payload := MQTTPayload, - qos := MQTTQoS - } + key_encoding_mode := KeyEncodingMode, + resource_id := ResourceId, + topic_mapping := TopicMapping, + value_encoding_mode := ValueEncodingMode } = State, + #{ + mqtt_topic := MQTTTopic, + qos := MQTTQoS, + payload_template := PayloadTemplate + } = maps:get(KafkaTopic, TopicMapping), FullMessage = #{ + headers => maps:from_list(Message#kafka_message.headers), + key => encode(Message#kafka_message.key, KeyEncodingMode), offset => Message#kafka_message.offset, - key => Message#kafka_message.key, - value => Message#kafka_message.value, + topic => KafkaTopic, ts => Message#kafka_message.ts, ts_type => Message#kafka_message.ts_type, - headers => maps:from_list(Message#kafka_message.headers), - topic => KafkaTopic + value => encode(Message#kafka_message.value, ValueEncodingMode) }, - Payload = - case MQTTPayload of - full_message -> - FullMessage; - message_value -> - Message#kafka_message.value - end, - EncodedPayload = emqx_json:encode(Payload), - MQTTMessage = emqx_message:make(ResourceId, MQTTQoS, MQTTTopic, EncodedPayload), + Payload = render(FullMessage, PayloadTemplate), + MQTTMessage = emqx_message:make(ResourceId, MQTTQoS, MQTTTopic, Payload), _ = emqx:publish(MQTTMessage), emqx:run_hook(Hookpoint, [FullMessage]), emqx_resource_metrics:received_inc(ResourceId), @@ -251,21 +271,20 @@ start_consumer(Config, InstanceId, ClientID) -> max_batch_bytes := MaxBatchBytes, max_rejoin_attempts := MaxRejoinAttempts, offset_commit_interval_seconds := OffsetCommitInterval, - offset_reset_policy := OffsetResetPolicy, - topic := KafkaTopic + offset_reset_policy := OffsetResetPolicy }, - mqtt := #{topic := MQTTTopic, qos := MQTTQoS, payload := MQTTPayload} + key_encoding_mode := KeyEncodingMode, + topic_mapping := TopicMapping0, + value_encoding_mode := ValueEncodingMode } = Config, ok = ensure_consumer_supervisor_started(), + TopicMapping = convert_topic_mapping(TopicMapping0), InitialState = #{ - resource_id => emqx_bridge_resource:resource_id(kafka_consumer, BridgeName), - mqtt => #{ - payload => MQTTPayload, - topic => MQTTTopic, - qos => MQTTQoS - }, + key_encoding_mode => KeyEncodingMode, hookpoint => Hookpoint, - kafka_topic => KafkaTopic + resource_id => emqx_bridge_resource:resource_id(kafka_consumer, BridgeName), + topic_mapping => TopicMapping, + value_encoding_mode => ValueEncodingMode }, %% note: the group id should be the same for all nodes in the %% cluster, so that the load gets distributed between all @@ -279,11 +298,12 @@ start_consumer(Config, InstanceId, ClientID) -> {max_rejoin_attempts, MaxRejoinAttempts}, {offset_commit_interval_seconds, OffsetCommitInterval} ], + KafkaTopics = maps:keys(TopicMapping), GroupSubscriberConfig = #{ client => ClientID, group_id => GroupID, - topics => [KafkaTopic], + topics => KafkaTopics, cb_module => ?MODULE, init_data => InitialState, message_type => message, @@ -304,14 +324,13 @@ start_consumer(Config, InstanceId, ClientID) -> {ok, #{ subscriber_id => SubscriberId, kafka_client_id => ClientID, - kafka_topic => KafkaTopic + kafka_topics => KafkaTopics }}; {error, Reason2} -> ?SLOG(error, #{ msg => "failed_to_start_kafka_consumer", instance_id => InstanceId, kafka_hosts => emqx_bridge_impl_kafka:hosts(BootstrapHosts0), - kafka_topic => KafkaTopic, reason => emqx_misc:redact(Reason2) }), stop_client(ClientID), @@ -344,6 +363,19 @@ stop_client(ClientID) -> ), ok. +do_get_status(ClientID, [KafkaTopic | RestTopics], SubscriberId) -> + case brod:get_partitions_count(ClientID, KafkaTopic) of + {ok, NPartitions} -> + case do_get_status(ClientID, KafkaTopic, SubscriberId, NPartitions) of + connected -> do_get_status(ClientID, RestTopics, SubscriberId); + disconnected -> disconnected + end; + _ -> + disconnected + end; +do_get_status(_ClientID, _KafkaTopics = [], _SubscriberId) -> + connected. + -spec do_get_status(brod:client_id(), binary(), subscriber_id(), pos_integer()) -> connected | disconnected. do_get_status(ClientID, KafkaTopic, SubscriberId, NPartitions) -> @@ -424,5 +456,44 @@ make_client_id(InstanceId, KafkaType, KafkaName) -> probing_brod_consumers end. +convert_topic_mapping(TopicMappingList) -> + lists:foldl( + fun(Fields, Acc) -> + #{ + kafka_topic := KafkaTopic, + mqtt_topic := MQTTTopic, + qos := QoS, + payload_template := PayloadTemplate0 + } = Fields, + PayloadTemplate = emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate0), + Acc#{ + KafkaTopic => #{ + payload_template => PayloadTemplate, + mqtt_topic => MQTTTopic, + qos => QoS + } + } + end, + #{}, + TopicMappingList + ). + +render(FullMessage, PayloadTemplate) -> + Opts = #{ + return => full_binary, + var_trans => fun + (undefined) -> + <<>>; + (X) -> + emqx_plugin_libs_rule:bin(X) + end + }, + emqx_plugin_libs_rule:proc_tmpl(PayloadTemplate, FullMessage, Opts). + +encode(Value, force_utf8) -> + Value; +encode(Value, base64) -> + base64:encode(Value). + to_bin(B) when is_binary(B) -> B; to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8). diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl index e0be06e1a..129011862 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl @@ -10,6 +10,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("brod/include/brod.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). -import(emqx_common_test_helpers, [on_exit/1]). @@ -53,7 +54,8 @@ sasl_only_tests() -> only_once_tests() -> [ t_bridge_rule_action_source, - t_cluster_group + t_cluster_group, + t_multiple_topic_mappings ]. init_per_suite(Config) -> @@ -284,6 +286,32 @@ init_per_testcase(TestCase, Config) when init_per_testcase(t_cluster_group = TestCase, Config0) -> Config = emqx_misc:merge_opts(Config0, [{num_partitions, 6}]), common_init_per_testcase(TestCase, Config); +init_per_testcase(t_multiple_topic_mappings = TestCase, Config0) -> + KafkaTopicBase = + << + (atom_to_binary(TestCase))/binary, + (integer_to_binary(erlang:unique_integer()))/binary + >>, + MQTTTopicBase = + <<"mqtt/", (atom_to_binary(TestCase))/binary, + (integer_to_binary(erlang:unique_integer()))/binary, "/">>, + TopicMapping = + [ + #{ + kafka_topic => <>, + mqtt_topic => <>, + qos => 1, + payload_template => <<"${.}">> + }, + #{ + kafka_topic => <>, + mqtt_topic => <>, + qos => 2, + payload_template => <<"v = ${.value}">> + } + ], + Config = [{topic_mapping, TopicMapping} | Config0], + common_init_per_testcase(TestCase, Config); init_per_testcase(TestCase, Config) -> common_init_per_testcase(TestCase, Config). @@ -295,23 +323,35 @@ common_init_per_testcase(TestCase, Config0) -> (atom_to_binary(TestCase))/binary, (integer_to_binary(erlang:unique_integer()))/binary >>, - Config = [{kafka_topic, KafkaTopic} | Config0], - KafkaType = ?config(kafka_type, Config), + KafkaType = ?config(kafka_type, Config0), + UniqueNum = integer_to_binary(erlang:unique_integer()), + MQTTTopic = proplists:get_value(mqtt_topic, Config0, <<"mqtt/topic/", UniqueNum/binary>>), + MQTTQoS = proplists:get_value(mqtt_qos, Config0, 0), + DefaultTopicMapping = [ + #{ + kafka_topic => KafkaTopic, + mqtt_topic => MQTTTopic, + qos => MQTTQoS, + payload_template => <<"${.}">> + } + ], + TopicMapping = proplists:get_value(topic_mapping, Config0, DefaultTopicMapping), + Config = [ + {kafka_topic, KafkaTopic}, + {topic_mapping, TopicMapping} + | Config0 + ], {Name, ConfigString, KafkaConfig} = kafka_config( TestCase, KafkaType, Config ), - ensure_topic(Config), - #{ - producers := Producers, - clientid := KafkaProducerClientId - } = start_producer(TestCase, Config), + ensure_topics(Config), + ProducersConfigs = start_producers(TestCase, Config), ok = snabbkaffe:start_trace(), [ {kafka_name, Name}, {kafka_config_string, ConfigString}, {kafka_config, KafkaConfig}, - {kafka_producers, Producers}, - {kafka_producer_clientid, KafkaProducerClientId} + {kafka_producers, ProducersConfigs} | Config ]. @@ -323,11 +363,17 @@ end_per_testcase(_Testcase, Config) -> false -> ProxyHost = ?config(proxy_host, Config), ProxyPort = ?config(proxy_port, Config), - Producers = ?config(kafka_producers, Config), - KafkaProducerClientId = ?config(kafka_producer_clientid, Config), + ProducersConfigs = ?config(kafka_producers, Config), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), delete_all_bridges(), - ok = wolff:stop_and_delete_supervised_producers(Producers), + #{clientid := KafkaProducerClientId, producers := ProducersMapping} = + ProducersConfigs, + lists:foreach( + fun(Producers) -> + ok = wolff:stop_and_delete_supervised_producers(Producers) + end, + maps:values(ProducersMapping) + ), ok = wolff:stop_and_delete_supervised_client(KafkaProducerClientId), emqx_common_test_helpers:call_janitor(), ok = snabbkaffe:stop(), @@ -338,8 +384,8 @@ end_per_testcase(_Testcase, Config) -> %% Helper fns %%------------------------------------------------------------------------------ -start_producer(TestCase, Config) -> - KafkaTopic = ?config(kafka_topic, Config), +start_producers(TestCase, Config) -> + TopicMapping = ?config(topic_mapping, Config), KafkaClientId = <<"test-client-", (atom_to_binary(TestCase))/binary, (integer_to_binary(erlang:unique_integer()))/binary>>, @@ -381,9 +427,27 @@ start_producer(TestCase, Config) -> ssl => SSL }, {ok, Clients} = wolff:ensure_supervised_client(KafkaClientId, Hosts, ClientConfig), + ProducersData0 = + #{ + clients => Clients, + clientid => KafkaClientId, + producers => #{} + }, + lists:foldl( + fun(#{kafka_topic := KafkaTopic}, #{producers := ProducersMapping0} = Acc) -> + Producers = do_start_producer(KafkaClientId, KafkaTopic), + ProducersMapping = ProducersMapping0#{KafkaTopic => Producers}, + Acc#{producers := ProducersMapping} + end, + ProducersData0, + TopicMapping + ). + +do_start_producer(KafkaClientId, KafkaTopic) -> + Name = binary_to_atom(<>), ProducerConfig = #{ - name => test_producer, + name => Name, partitioner => roundrobin, partition_count_refresh_interval_seconds => 1_000, replayq_max_total_bytes => 10_000, @@ -396,14 +460,10 @@ start_producer(TestCase, Config) -> telemetry_meta_data => #{} }, {ok, Producers} = wolff:ensure_supervised_producers(KafkaClientId, KafkaTopic, ProducerConfig), - #{ - producers => Producers, - clients => Clients, - clientid => KafkaClientId - }. + Producers. -ensure_topic(Config) -> - KafkaTopic = ?config(kafka_topic, Config), +ensure_topics(Config) -> + TopicMapping = ?config(topic_mapping, Config), KafkaHost = ?config(kafka_host, Config), KafkaPort = ?config(kafka_port, Config), UseTLS = ?config(use_tls, Config), @@ -418,6 +478,7 @@ ensure_topic(Config) -> assignments => [], configs => [] } + || #{kafka_topic := KafkaTopic} <- TopicMapping ], RequestConfig = #{timeout => 5_000}, ConnConfig0 = @@ -464,8 +525,16 @@ shared_secret(rig_keytab) -> filename:join([shared_secret_path(), "rig.keytab"]). publish(Config, Messages) -> - Producers = ?config(kafka_producers, Config), - ct:pal("publishing: ~p", [Messages]), + %% pick the first topic if not specified + #{producers := ProducersMapping} = ?config(kafka_producers, Config), + [{KafkaTopic, Producers} | _] = maps:to_list(ProducersMapping), + ct:pal("publishing to ~p:\n ~p", [KafkaTopic, Messages]), + {_Partition, _OffsetReply} = wolff:send_sync(Producers, Messages, 10_000). + +publish(Config, KafkaTopic, Messages) -> + #{producers := ProducersMapping} = ?config(kafka_producers, Config), + #{KafkaTopic := Producers} = ProducersMapping, + ct:pal("publishing to ~p:\n ~p", [KafkaTopic, Messages]), {_Partition, _OffsetReply} = wolff:send_sync(Producers, Messages, 10_000). kafka_config(TestCase, _KafkaType, Config) -> @@ -480,7 +549,16 @@ kafka_config(TestCase, _KafkaType, Config) -> >>, MQTTTopic = proplists:get_value(mqtt_topic, Config, <<"mqtt/topic/", UniqueNum/binary>>), MQTTQoS = proplists:get_value(mqtt_qos, Config, 0), - MQTTPayload = proplists:get_value(mqtt_payload, Config, full_message), + DefaultTopicMapping = [ + #{ + kafka_topic => KafkaTopic, + mqtt_topic => MQTTTopic, + qos => MQTTQoS, + payload_template => <<"${.}">> + } + ], + TopicMapping0 = proplists:get_value(topic_mapping, Config, DefaultTopicMapping), + TopicMappingStr = topic_mapping(TopicMapping0), ConfigString = io_lib:format( "bridges.kafka_consumer.~s {\n" @@ -491,18 +569,15 @@ kafka_config(TestCase, _KafkaType, Config) -> " metadata_request_timeout = 5s\n" "~s" " kafka {\n" - " topic = ~s\n" " max_batch_bytes = 896KB\n" " max_rejoin_attempts = 5\n" " offset_commit_interval_seconds = 3\n" %% todo: matrix this " offset_reset_policy = reset_to_latest\n" " }\n" - " mqtt {\n" - " topic = \"~s\"\n" - " qos = ~b\n" - " payload = ~p\n" - " }\n" + "~s" + " key_encoding_mode = force_utf8\n" + " value_encoding_mode = force_utf8\n" " ssl {\n" " enable = ~p\n" " verify = verify_none\n" @@ -514,15 +589,35 @@ kafka_config(TestCase, _KafkaType, Config) -> KafkaHost, KafkaPort, authentication(AuthType), - KafkaTopic, - MQTTTopic, - MQTTQoS, - MQTTPayload, + TopicMappingStr, UseTLS ] ), {Name, ConfigString, parse_and_check(ConfigString, Name)}. +topic_mapping(TopicMapping0) -> + Template0 = << + "{kafka_topic = \"{{ kafka_topic }}\"," + " mqtt_topic = \"{{ mqtt_topic }}\"," + " qos = {{ qos }}," + " payload_template = \"{{{ payload_template }}}\" }" + >>, + Template = bbmustache:parse_binary(Template0), + Entries = + lists:map( + fun(Params) -> + bbmustache:compile(Template, Params, [{key_type, atom}]) + end, + TopicMapping0 + ), + iolist_to_binary( + [ + " topic_mapping = [", + lists:join(<<",\n">>, Entries), + "]\n" + ] + ). + authentication(Type) when Type =:= scram_sha_256; Type =:= scram_sha_512; @@ -578,6 +673,29 @@ delete_all_bridges() -> emqx_bridge:list() ). +create_bridge_api(Config) -> + create_bridge_api(Config, _Overrides = #{}). + +create_bridge_api(Config, Overrides) -> + TypeBin = ?BRIDGE_TYPE_BIN, + Name = ?config(kafka_name, Config), + KafkaConfig0 = ?config(kafka_config, Config), + KafkaConfig = emqx_map_lib:deep_merge(KafkaConfig0, Overrides), + Params = KafkaConfig#{<<"type">> => TypeBin, <<"name">> => Name}, + Path = emqx_mgmt_api_test_util:api_path(["bridges"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + ct:pal("creating bridge (via http): ~p", [Params]), + Res = + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of + {ok, {Status, Headers, Body0}} -> + {ok, {Status, Headers, emqx_json:decode(Body0, [return_maps])}}; + Error -> + Error + end, + ct:pal("bridge create result: ~p", [Res]), + Res. + update_bridge_api(Config) -> update_bridge_api(Config, _Overrides = #{}). @@ -702,15 +820,24 @@ wait_until_subscribers_are_ready(N, Timeout) -> %% flaky about when they decide truly consuming the messages... %% `Period' should be greater than the `sleep_timeout' of the consumer %% (default 1 s). -ping_until_healthy(_Config, _Period, Timeout) when Timeout =< 0 -> - ct:fail("kafka subscriber did not stabilize!"); ping_until_healthy(Config, Period, Timeout) -> + #{producers := ProducersMapping} = ?config(kafka_producers, Config), + [KafkaTopic | _] = maps:keys(ProducersMapping), + ping_until_healthy(Config, KafkaTopic, Period, Timeout). + +ping_until_healthy(_Config, _KafkaTopic, _Period, Timeout) when Timeout =< 0 -> + ct:fail("kafka subscriber did not stabilize!"); +ping_until_healthy(Config, KafkaTopic, Period, Timeout) -> TimeA = erlang:monotonic_time(millisecond), Payload = emqx_guid:to_hexstr(emqx_guid:gen()), - publish(Config, [#{key => <<"probing">>, value => Payload}]), + publish(Config, KafkaTopic, [#{key => <<"probing">>, value => Payload}]), Res = ?block_until( - #{?snk_kind := kafka_consumer_handle_message, ?snk_span := {complete, _}}, + #{ + ?snk_kind := kafka_consumer_handle_message, + ?snk_span := {complete, _}, + message := #kafka_message{value = Payload} + }, Period ), case Res of @@ -930,6 +1057,132 @@ t_start_and_consume_ok(Config) -> ), ok. +t_multiple_topic_mappings(Config) -> + TopicMapping = ?config(topic_mapping, Config), + MQTTTopics = [MQTTTopic || #{mqtt_topic := MQTTTopic} <- TopicMapping], + KafkaTopics = [KafkaTopic || #{kafka_topic := KafkaTopic} <- TopicMapping], + NumMQTTTopics = length(MQTTTopics), + NPartitions = ?config(num_partitions, Config), + ResourceId = resource_id(Config), + Payload = emqx_guid:to_hexstr(emqx_guid:gen()), + ?check_trace( + begin + ?assertMatch( + {ok, {{_, 201, _}, _, _}}, + create_bridge_api(Config) + ), + wait_until_subscribers_are_ready(NPartitions, 40_000), + lists:foreach( + fun(KafkaTopic) -> + ping_until_healthy(Config, KafkaTopic, _Period = 1_500, _Timeout = 24_000) + end, + KafkaTopics + ), + + {ok, C} = emqtt:start_link([{proto_ver, v5}]), + on_exit(fun() -> emqtt:stop(C) end), + {ok, _} = emqtt:connect(C), + lists:foreach( + fun(MQTTTopic) -> + %% we use the hightest QoS so that we can check what + %% the subscription was. + QoS2Granted = 2, + {ok, _, [QoS2Granted]} = emqtt:subscribe(C, MQTTTopic, ?QOS_2) + end, + MQTTTopics + ), + + {ok, SRef0} = + snabbkaffe:subscribe( + ?match_event(#{ + ?snk_kind := kafka_consumer_handle_message, ?snk_span := {complete, _} + }), + NumMQTTTopics, + _Timeout0 = 20_000 + ), + lists:foreach( + fun(KafkaTopic) -> + publish(Config, KafkaTopic, [ + #{ + key => <<"mykey">>, + value => Payload, + headers => [{<<"hkey">>, <<"hvalue">>}] + } + ]) + end, + KafkaTopics + ), + {ok, _} = snabbkaffe:receive_events(SRef0), + + %% Check that the bridge probe API doesn't leak atoms. + ProbeRes = probe_bridge_api(Config), + ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes), + AtomsBefore = erlang:system_info(atom_count), + %% Probe again; shouldn't have created more atoms. + ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes), + AtomsAfter = erlang:system_info(atom_count), + ?assertEqual(AtomsBefore, AtomsAfter), + + ok + end, + fun(Trace) -> + %% two messages processed with begin/end events + ?assertMatch([_, _, _, _ | _], ?of_kind(kafka_consumer_handle_message, Trace)), + Published = receive_published(#{n => NumMQTTTopics}), + lists:foreach( + fun( + #{ + mqtt_topic := MQTTTopic, + qos := MQTTQoS + } + ) -> + [Msg] = [ + Msg + || Msg = #{topic := T} <- Published, + T =:= MQTTTopic + ], + ?assertMatch( + #{ + qos := MQTTQoS, + topic := MQTTTopic, + payload := _ + }, + Msg + ) + end, + TopicMapping + ), + %% check that we observed the different payload templates + %% as configured. + Payloads = + lists:sort([ + case emqx_json:safe_decode(P, [return_maps]) of + {ok, Decoded} -> Decoded; + {error, _} -> P + end + || #{payload := P} <- Published + ]), + ?assertMatch( + [ + #{ + <<"headers">> := #{<<"hkey">> := <<"hvalue">>}, + <<"key">> := <<"mykey">>, + <<"offset">> := Offset, + <<"topic">> := KafkaTopic, + <<"ts">> := TS, + <<"ts_type">> := <<"create">>, + <<"value">> := Payload + }, + <<"v = ", Payload/binary>> + ] when is_integer(Offset) andalso is_integer(TS) andalso is_binary(KafkaTopic), + Payloads + ), + ?assertEqual(2, emqx_resource_metrics:received_get(ResourceId)), + ok + end + ), + ok. + t_on_get_status(Config) -> case proplists:get_bool(skip_does_not_apply, Config) of true ->