feat(kafka_consumer): add mqtt topic placeholder support

Fixes https://emqx.atlassian.net/browse/EMQX-10678
This commit is contained in:
Thales Macedo Garitezi 2023-08-07 11:39:05 -03:00
parent a0467fa298
commit 9900a32850
5 changed files with 102 additions and 8 deletions

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_bridge_kafka, [
{description, "EMQX Enterprise Kafka Bridge"},
{vsn, "0.1.6"},
{vsn, "0.1.7"},
{registered, [emqx_bridge_kafka_consumer_sup]},
{applications, [
kernel,

View File

@ -125,7 +125,7 @@ values(consumer) ->
topic_mapping => [
#{
kafka_topic => <<"kafka-topic-1">>,
mqtt_topic => <<"mqtt/topic/1">>,
mqtt_topic => <<"mqtt/topic/${.offset}">>,
qos => 1,
payload_template => <<"${.}">>
},

View File

@ -69,7 +69,7 @@
topic_mapping := #{
kafka_topic() := #{
payload_template := emqx_placeholder:tmpl_token(),
mqtt_topic => emqx_types:topic(),
mqtt_topic_template => emqx_placeholder:tmpl_token(),
qos => emqx_types:qos()
}
},
@ -83,7 +83,7 @@
topic_mapping := #{
kafka_topic() := #{
payload_template := emqx_placeholder:tmpl_token(),
mqtt_topic => emqx_types:topic(),
mqtt_topic_template => emqx_placeholder:tmpl_token(),
qos => emqx_types:qos()
}
},
@ -235,7 +235,7 @@ do_handle_message(Message, State) ->
value_encoding_mode := ValueEncodingMode
} = State,
#{
mqtt_topic := MQTTTopic,
mqtt_topic_template := MQTTTopicTemplate,
qos := MQTTQoS,
payload_template := PayloadTemplate
} = maps:get(KafkaTopic, TopicMapping),
@ -249,6 +249,7 @@ do_handle_message(Message, State) ->
value => encode(Message#kafka_message.value, ValueEncodingMode)
},
Payload = render(FullMessage, PayloadTemplate),
MQTTTopic = render(FullMessage, MQTTTopicTemplate),
MQTTMessage = emqx_message:make(ResourceId, MQTTQoS, MQTTTopic, Payload),
_ = emqx:publish(MQTTMessage),
emqx:run_hook(Hookpoint, [FullMessage]),
@ -533,15 +534,16 @@ convert_topic_mapping(TopicMappingList) ->
fun(Fields, Acc) ->
#{
kafka_topic := KafkaTopic,
mqtt_topic := MQTTTopic,
mqtt_topic := MQTTTopicTemplate0,
qos := QoS,
payload_template := PayloadTemplate0
} = Fields,
PayloadTemplate = emqx_placeholder:preproc_tmpl(PayloadTemplate0),
MQTTTopicTemplate = emqx_placeholder:preproc_tmpl(MQTTTopicTemplate0),
Acc#{
KafkaTopic => #{
payload_template => PayloadTemplate,
mqtt_topic => MQTTTopic,
mqtt_topic_template => MQTTTopicTemplate,
qos => QoS
}
}

View File

@ -60,6 +60,7 @@ only_once_tests() ->
t_node_joins_existing_cluster,
t_cluster_node_down,
t_multiple_topic_mappings,
t_dynamic_mqtt_topic,
t_resource_manager_crash_after_subscriber_started,
t_resource_manager_crash_before_subscriber_started
].
@ -329,6 +330,23 @@ init_per_testcase(t_multiple_topic_mappings = TestCase, Config0) ->
],
Config = [{topic_mapping, TopicMapping} | Config0],
common_init_per_testcase(TestCase, Config);
init_per_testcase(t_dynamic_mqtt_topic = TestCase, Config0) ->
KafkaTopic =
<<
(atom_to_binary(TestCase))/binary,
(integer_to_binary(erlang:unique_integer()))/binary
>>,
TopicMapping =
[
#{
kafka_topic => KafkaTopic,
mqtt_topic => <<"${.topic}/${.value}/${.headers.hkey}">>,
qos => 1,
payload_template => <<"${.}">>
}
],
Config = [{kafka_topic, KafkaTopic}, {topic_mapping, TopicMapping} | Config0],
common_init_per_testcase(TestCase, Config);
init_per_testcase(TestCase, Config) ->
common_init_per_testcase(TestCase, Config).
@ -336,11 +354,12 @@ common_init_per_testcase(TestCase, Config0) ->
ct:timetrap(timer:seconds(60)),
delete_all_bridges(),
emqx_config:delete_override_conf_files(),
KafkaTopic =
KafkaTopic0 =
<<
(atom_to_binary(TestCase))/binary,
(integer_to_binary(erlang:unique_integer()))/binary
>>,
KafkaTopic = proplists:get_value(kafka_topic, Config0, KafkaTopic0),
KafkaType = ?config(kafka_type, Config0),
UniqueNum = integer_to_binary(erlang:unique_integer()),
MQTTTopic = proplists:get_value(mqtt_topic, Config0, <<"mqtt/topic/", UniqueNum/binary>>),
@ -1674,6 +1693,78 @@ t_bridge_rule_action_source(Config) ->
),
ok.
t_dynamic_mqtt_topic(Config) ->
KafkaTopic = ?config(kafka_topic, Config),
NPartitions = ?config(num_partitions, Config),
ResourceId = resource_id(Config),
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
MQTTTopic = emqx_topic:join([KafkaTopic, '#']),
?check_trace(
begin
?assertMatch(
{ok, _},
create_bridge(Config)
),
wait_until_subscribers_are_ready(NPartitions, 40_000),
{ok, C} = emqtt:start_link(),
on_exit(fun() -> emqtt:stop(C) end),
{ok, _} = emqtt:connect(C),
{ok, _, [0]} = emqtt:subscribe(C, MQTTTopic),
ct:pal("subscribed to ~p", [MQTTTopic]),
{ok, SRef0} = snabbkaffe:subscribe(
?match_event(#{
?snk_kind := kafka_consumer_handle_message, ?snk_span := {complete, _}
}),
_NumMsgs = 3,
20_000
),
{_Partition, _OffsetReply} =
publish(Config, [
%% this will have the last segment defined
#{
key => <<"mykey">>,
value => Payload,
headers => [{<<"hkey">>, <<"hvalue">>}]
},
%% this will not
#{
key => <<"mykey">>,
value => Payload
},
%% will inject an invalid topic segment
#{
key => <<"mykey">>,
value => <<"+">>
}
]),
{ok, _} = snabbkaffe:receive_events(SRef0),
ok
end,
fun(Trace) ->
?assertMatch([_Enter, _Complete | _], ?of_kind(kafka_consumer_handle_message, Trace)),
%% the message with invalid topic will fail to be published
Published = receive_published(#{n => 2}),
ExpectedMQTTTopic0 = emqx_topic:join([KafkaTopic, Payload, <<"hvalue">>]),
ExpectedMQTTTopic1 = emqx_topic:join([KafkaTopic, Payload, <<>>]),
?assertMatch(
[
#{
topic := ExpectedMQTTTopic0
},
#{
topic := ExpectedMQTTTopic1
}
],
Published
),
?assertEqual(3, emqx_resource_metrics:received_get(ResourceId)),
?assertError({timeout, _}, receive_published(#{timeout => 500})),
ok
end
),
ok.
%% checks that an existing cluster can be configured with a kafka
%% consumer bridge and that the consumers will distribute over the two
%% nodes.

View File

@ -0,0 +1 @@
Added support for using placeholders to define MQTT Topic in Kafka Consumer bridge topic mappings.