diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf
index ed88a1e0d..2e4a58c7f 100644
--- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf
+++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf
@@ -633,10 +633,12 @@ emqx_ee_bridge_kafka {
desc {
en: "Defines how the key or value from the Kafka message is"
" dealt with before being forwarded via MQTT.\n"
- "force_utf8
Uses UTF-8 encoding directly from the original message.\n"
+ "none
Uses the key or value from the Kafka message unchanged."
+ " Note: in this case, then the key or value must be a valid UTF-8 string.\n"
"base64
Uses base-64 encoding on the received key or value."
zh: "定义了在通过MQTT转发之前如何处理Kafka消息的键或值。"
- "force_utf8
直接使用原始信息的UTF-8编码。\n"
+ "none
使用Kafka消息中的键或值,不改变。"
+ " 注意:在这种情况下,那么键或值必须是一个有效的UTF-8字符串。\n"
"base64
对收到的密钥或值使用base-64编码。"
}
label {
diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl
index 943009945..8e9ff9628 100644
--- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl
+++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl
@@ -109,7 +109,7 @@ values(consumer) ->
offset_reset_policy => <<"reset_to_latest">>,
offset_commit_interval_seconds => 5
},
- key_encoding_mode => <<"force_utf8">>,
+ key_encoding_mode => <<"none">>,
topic_mapping => [
#{
kafka_topic => <<"kafka-topic-1">>,
@@ -124,7 +124,7 @@ values(consumer) ->
payload_template => <<"v = ${.value}">>
}
],
- value_encoding_mode => <<"force_utf8">>
+ value_encoding_mode => <<"none">>
}.
%% -------------------------------------------------------------------------------------------------
@@ -334,22 +334,16 @@ fields(consumer_opts) ->
#{
required => true,
desc => ?DESC(consumer_topic_mapping),
- validator =>
- fun
- ([]) ->
- {error, "There must be at least one Kafka-MQTT topic mapping"};
- ([_ | _]) ->
- ok
- end
+ validator => fun consumer_topic_mapping_validator/1
}
)},
{key_encoding_mode,
- mk(enum([force_utf8, base64]), #{
- default => force_utf8, desc => ?DESC(consumer_encoding_mode)
+ mk(enum([none, base64]), #{
+ default => none, desc => ?DESC(consumer_encoding_mode)
})},
{value_encoding_mode,
- mk(enum([force_utf8, base64]), #{
- default => force_utf8, desc => ?DESC(consumer_encoding_mode)
+ mk(enum([none, base64]), #{
+ default => none, desc => ?DESC(consumer_encoding_mode)
})}
];
fields(consumer_topic_mapping) ->
@@ -449,3 +443,16 @@ kafka_producer_converter(
kafka_producer_converter(Config, _HoconOpts) ->
%% new schema
Config.
+
+consumer_topic_mapping_validator(_TopicMapping = []) ->
+ {error, "There must be at least one Kafka-MQTT topic mapping"};
+consumer_topic_mapping_validator(TopicMapping = [_ | _]) ->
+ NumEntries = length(TopicMapping),
+ KafkaTopics = [KT || #{<<"kafka_topic">> := KT} <- TopicMapping],
+ DistinctKafkaTopics = length(lists:usort(KafkaTopics)),
+ case DistinctKafkaTopics =:= NumEntries of
+ true ->
+ ok;
+ false ->
+ {error, "Kafka topics must not be repeated in a bridge"}
+ end.
diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl
index 99877ca8e..6abd3ed02 100644
--- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl
+++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl
@@ -61,7 +61,7 @@
}.
-type offset_reset_policy() :: reset_to_latest | reset_to_earliest | reset_by_subscriber.
%% -type mqtt_payload() :: full_message | message_value.
--type encoding_mode() :: force_utf8 | base64.
+-type encoding_mode() :: none | base64.
-type consumer_init_data() :: #{
hookpoint := binary(),
key_encoding_mode := encoding_mode(),
@@ -490,7 +490,7 @@ render(FullMessage, PayloadTemplate) ->
},
emqx_plugin_libs_rule:proc_tmpl(PayloadTemplate, FullMessage, Opts).
-encode(Value, force_utf8) ->
+encode(Value, none) ->
Value;
encode(Value, base64) ->
base64:encode(Value).
diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl
index 129011862..cb984fcf6 100644
--- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl
+++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl
@@ -576,8 +576,8 @@ kafka_config(TestCase, _KafkaType, Config) ->
" offset_reset_policy = reset_to_latest\n"
" }\n"
"~s"
- " key_encoding_mode = force_utf8\n"
- " value_encoding_mode = force_utf8\n"
+ " key_encoding_mode = none\n"
+ " value_encoding_mode = none\n"
" ssl {\n"
" enable = ~p\n"
" verify = verify_none\n"
diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_kafka_tests.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_kafka_tests.erl
index 47c21b673..72096c7b1 100644
--- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_kafka_tests.erl
+++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_kafka_tests.erl
@@ -76,6 +76,68 @@ kafka_producer_test() ->
ok.
+kafka_consumer_test() ->
+ Conf1 = parse(kafka_consumer_hocon()),
+ ?assertMatch(
+ #{
+ <<"bridges">> :=
+ #{
+ <<"kafka_consumer">> :=
+ #{
+ <<"my_consumer">> := _
+ }
+ }
+ },
+ check(Conf1)
+ ),
+
+ %% Bad: can't repeat kafka topics.
+ BadConf1 = emqx_map_lib:deep_put(
+ [<<"bridges">>, <<"kafka_consumer">>, <<"my_consumer">>, <<"topic_mapping">>],
+ Conf1,
+ [
+ #{
+ <<"kafka_topic">> => <<"t1">>,
+ <<"mqtt_topic">> => <<"mqtt/t1">>,
+ <<"qos">> => 1,
+ <<"payload_template">> => <<"${.}">>
+ },
+ #{
+ <<"kafka_topic">> => <<"t1">>,
+ <<"mqtt_topic">> => <<"mqtt/t2">>,
+ <<"qos">> => 2,
+ <<"payload_template">> => <<"v = ${.value}">>
+ }
+ ]
+ ),
+ ?assertThrow(
+ {_, [
+ #{
+ path := "bridges.kafka_consumer.my_consumer.topic_mapping",
+ reason := "Kafka topics must not be repeated in a bridge"
+ }
+ ]},
+ check(BadConf1)
+ ),
+
+ %% Bad: there must be at least 1 mapping.
+ BadConf2 = emqx_map_lib:deep_put(
+ [<<"bridges">>, <<"kafka_consumer">>, <<"my_consumer">>, <<"topic_mapping">>],
+ Conf1,
+ []
+ ),
+ ?assertThrow(
+ {_, [
+ #{
+ path := "bridges.kafka_consumer.my_consumer.topic_mapping",
+ reason := "There must be at least one Kafka-MQTT topic mapping"
+ }
+ ]},
+ check(BadConf2)
+ ),
+
+ ok.
+
%%===========================================================================
%% Helper functions
%%===========================================================================
@@ -179,3 +241,47 @@ kafka_producer_new_hocon() ->
" }\n"
"}\n"
"".
+
+%% erlfmt-ignore
+kafka_consumer_hocon() ->
+"""
+bridges.kafka_consumer.my_consumer {
+ enable = true
+ bootstrap_hosts = \"kafka-1.emqx.net:9292\"
+ connect_timeout = 5s
+ min_metadata_refresh_interval = 3s
+ metadata_request_timeout = 5s
+ authentication = {
+ mechanism = plain
+ username = emqxuser
+ password = password
+ }
+ kafka {
+ max_batch_bytes = 896KB
+ max_rejoin_attempts = 5
+ offset_commit_interval_seconds = 3
+ offset_reset_policy = reset_to_latest
+ }
+ topic_mapping = [
+ {
+ kafka_topic = \"kafka-topic-1\"
+ mqtt_topic = \"mqtt/topic/1\"
+ qos = 1
+ payload_template = \"${.}\"
+ },
+ {
+ kafka_topic = \"kafka-topic-2\"
+ mqtt_topic = \"mqtt/topic/2\"
+ qos = 2
+ payload_template = \"v = ${.value}\"
+ }
+ ]
+ key_encoding_mode = none
+ value_encoding_mode = none
+ ssl {
+ enable = false
+ verify = verify_none
+ server_name_indication = \"auto\"
+ }
+}
+""".