feat(kafka_consumer): support multiple topic mappings, payload

templates and key/value encodings

Added after feedback from the product team.
This commit is contained in:
Thales Macedo Garitezi 2023-03-10 10:28:06 -03:00
parent f31f15e44e
commit 53979b6261
4 changed files with 509 additions and 126 deletions

View File

@ -536,17 +536,33 @@ emqx_ee_bridge_kafka {
} }
consumer_mqtt_payload { consumer_mqtt_payload {
desc { desc {
en: "The payload of the MQTT message to be published.\n" en: "The template for transforming the incoming Kafka message."
"<code>full_message</code> will encode available Kafka message attributes as a JSON object, including Key, Value, Timestamp and Headers" " By default, it will use JSON format to serialize all visible"
"<code>message_value</code> will directly use the Kafka message value as the " " inputs from the Kafka message. Such fields are:\n"
"MQTT message payload." "<code>headers</code>: an object containing string key-value pairs.\n"
zh: "要发布的MQTT消息的有效载荷。" "<code>key</code>: Kafka message key (uses the chosen key encoding).\n"
"<code>full_message</code>将把所有可用数据编码为JSON对象包括 KeyValueTimestamp 和 Headers。" "<code>offset</code>: offset for the message.\n"
"<code>message_value</code>将直接使用 Kafka 消息值作为MQTT消息的 Payload。" "<code>topic</code>: Kafka topic.\n"
"<code>ts</code>: message timestamp.\n"
"<code>ts_type</code>: message timestamp type, which is one of"
" <code>create</code>, <code>append</code> or <code>undefined</code>.\n"
"<code>value</code>: Kafka message value (uses the chosen value encoding).\n"
zh: "用于转换传入的Kafka消息的模板。 "
"默认情况下它将使用JSON格式来序列化所有来自Kafka消息的可见输入。 "
"这样的字段是。"
"<code>headers</code>: 一个包含字符串键值对的对象。\n"
"<code>key</code>: Kafka消息密钥使用选择的密钥编码。\n"
"<code>offset</code>: 信息的偏移量。\n"
"<code>topic</code>: 卡夫卡主题。\n"
"<code>ts</code>: 消息的时间戳。\n"
"<code>ts_type</code>: 消息的时间戳类型,它是一个"
" <code>create</code> <code>append</code> 或 <code>undefined</code>。\n"
"<code>value</code>: Kafka消息值使用选择的值编码。\n"
} }
label { label {
en: "MQTT Payload" en: "MQTT Payload Template"
zh: "MQTT Payload" zh: "MQTT Payload Template"
} }
} }
consumer_kafka_topic { consumer_kafka_topic {
@ -603,4 +619,29 @@ emqx_ee_bridge_kafka {
zh: "偏移承诺间隔" zh: "偏移承诺间隔"
} }
} }
consumer_topic_mapping {
desc {
en: "Defines the mapping between Kafka topics and MQTT topics. Must contain at least one item."
zh: "定义了Kafka主题和MQTT主题之间的映射。 必须至少包含一个项目。"
}
label {
en: "Topic Mapping"
zh: "主题图"
}
}
consumer_encoding_mode {
desc {
en: "Defines how the key or value from the Kafka message is"
" dealt with before being forwarded via MQTT.\n"
"<code>force_utf8</code> Uses UTF-8 encoding directly from the original message.\n"
"<code>base64</code> Uses base-64 encoding on the received key or value."
zh: "定义了在通过MQTT转发之前如何处理Kafka消息的键或值。"
"<code>force_utf8</code> 直接使用原始信息的UTF-8编码。\n"
"<code>base64</code> 对收到的密钥或值使用base-64编码。"
}
label {
en: "Encoding Mode"
zh: "编码模式"
}
}
} }

View File

@ -263,26 +263,44 @@ fields(producer_buffer) ->
fields(consumer_opts) -> fields(consumer_opts) ->
[ [
{kafka, {kafka,
mk(ref(consumer_kafka_opts), #{required => true, desc => ?DESC(consumer_kafka_opts)})}, mk(ref(consumer_kafka_opts), #{required => false, desc => ?DESC(consumer_kafka_opts)})},
{mqtt, mk(ref(consumer_mqtt_opts), #{required => true, desc => ?DESC(consumer_mqtt_opts)})} {topic_mapping,
];
fields(consumer_mqtt_opts) ->
[
{topic,
mk(binary(), #{
required => true,
desc => ?DESC(consumer_mqtt_topic)
})},
{qos, mk(emqx_schema:qos(), #{default => 0, desc => ?DESC(consumer_mqtt_qos)})},
{payload,
mk( mk(
enum([full_message, message_value]), hoconsc:array(ref(consumer_topic_mapping)),
#{default => full_message, desc => ?DESC(consumer_mqtt_payload)} #{
required => true,
desc => ?DESC(consumer_topic_mapping),
validator =>
fun
([]) ->
{error, "There must be at least one Kafka-MQTT topic mapping"};
([_ | _]) ->
ok
end
}
)},
{key_encoding_mode,
mk(enum([force_utf8, base64]), #{
default => force_utf8, desc => ?DESC(consumer_encoding_mode)
})},
{value_encoding_mode,
mk(enum([force_utf8, base64]), #{
default => force_utf8, desc => ?DESC(consumer_encoding_mode)
})}
];
fields(consumer_topic_mapping) ->
[
{kafka_topic, mk(binary(), #{required => true, desc => ?DESC(consumer_kafka_topic)})},
{mqtt_topic, mk(binary(), #{required => true, desc => ?DESC(consumer_mqtt_topic)})},
{qos, mk(emqx_schema:qos(), #{default => 0, desc => ?DESC(consumer_mqtt_qos)})},
{payload_template,
mk(
string(),
#{default => <<"${.}">>, desc => ?DESC(consumer_mqtt_payload)}
)} )}
]; ];
fields(consumer_kafka_opts) -> fields(consumer_kafka_opts) ->
[ [
{topic, mk(binary(), #{desc => ?DESC(consumer_kafka_topic)})},
{max_batch_bytes, {max_batch_bytes,
mk(emqx_schema:bytesize(), #{ mk(emqx_schema:bytesize(), #{
default => "896KB", desc => ?DESC(consumer_max_batch_bytes) default => "896KB", desc => ?DESC(consumer_max_batch_bytes)
@ -330,7 +348,7 @@ struct_names() ->
producer_opts, producer_opts,
consumer_opts, consumer_opts,
consumer_kafka_opts, consumer_kafka_opts,
consumer_mqtt_opts consumer_topic_mapping
]. ].
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------

View File

@ -41,31 +41,59 @@
offset_reset_policy := offset_reset_policy(), offset_reset_policy := offset_reset_policy(),
topic := binary() topic := binary()
}, },
mqtt := #{ topic_mapping := nonempty_list(
topic := emqx_types:topic(), #{
qos := emqx_types:qos(), kafka_topic := kafka_topic(),
payload := mqtt_payload() mqtt_topic := emqx_types:topic(),
}, qos := emqx_types:qos(),
payload_template := string()
}
),
ssl := _, ssl := _,
any() => term() any() => term()
}. }.
-type subscriber_id() :: emqx_ee_bridge_kafka_consumer_sup:child_id(). -type subscriber_id() :: emqx_ee_bridge_kafka_consumer_sup:child_id().
-type kafka_topic() :: brod:topic().
-type state() :: #{ -type state() :: #{
kafka_topic := binary(), kafka_topics := nonempty_list(kafka_topic()),
subscriber_id := subscriber_id(), subscriber_id := subscriber_id(),
kafka_client_id := brod:client_id() kafka_client_id := brod:client_id()
}. }.
-type offset_reset_policy() :: reset_to_latest | reset_to_earliest | reset_by_subscriber. -type offset_reset_policy() :: reset_to_latest | reset_to_earliest | reset_by_subscriber.
-type mqtt_payload() :: full_message | message_value. %% -type mqtt_payload() :: full_message | message_value.
-type consumer_state() :: #{ -type encoding_mode() :: force_utf8 | base64.
resource_id := resource_id(), -type consumer_init_data() :: #{
mqtt := #{
payload := mqtt_payload(),
topic => emqx_types:topic(),
qos => emqx_types:qos()
},
hookpoint := binary(), hookpoint := binary(),
kafka_topic := binary() key_encoding_mode := encoding_mode(),
resource_id := resource_id(),
topic_mapping := #{
kafka_topic() := #{
payload_template := emqx_plugin_libs_rule:tmpl_token(),
mqtt_topic => emqx_types:topic(),
qos => emqx_types:qos()
}
},
value_encoding_mode := encoding_mode()
}.
-type consumer_state() :: #{
hookpoint := binary(),
kafka_topic := binary(),
key_encoding_mode := encoding_mode(),
resource_id := resource_id(),
topic_mapping := #{
kafka_topic() := #{
payload_template := emqx_plugin_libs_rule:tmpl_token(),
mqtt_topic => emqx_types:topic(),
qos => emqx_types:qos()
}
},
value_encoding_mode := encoding_mode()
}.
-type subscriber_init_info() :: #{
topic => brod:topic(),
parition => brod:partition(),
group_id => brod:group_id(),
commit_fun => brod_group_subscriber_v2:commit_fun()
}. }.
%%------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------
@ -92,11 +120,10 @@ on_start(InstanceId, Config) ->
max_batch_bytes := _, max_batch_bytes := _,
max_rejoin_attempts := _, max_rejoin_attempts := _,
offset_commit_interval_seconds := _, offset_commit_interval_seconds := _,
offset_reset_policy := _, offset_reset_policy := _
topic := _
}, },
mqtt := #{topic := _, qos := _, payload := _}, ssl := SSL,
ssl := SSL topic_mapping := _
} = Config, } = Config,
BootstrapHosts = emqx_bridge_impl_kafka:hosts(BootstrapHosts0), BootstrapHosts = emqx_bridge_impl_kafka:hosts(BootstrapHosts0),
KafkaType = kafka_consumer, KafkaType = kafka_consumer,
@ -145,22 +172,19 @@ on_get_status(_InstanceID, State) ->
#{ #{
subscriber_id := SubscriberId, subscriber_id := SubscriberId,
kafka_client_id := ClientID, kafka_client_id := ClientID,
kafka_topic := KafkaTopic kafka_topics := KafkaTopics
} = State, } = State,
case brod:get_partitions_count(ClientID, KafkaTopic) of do_get_status(ClientID, KafkaTopics, SubscriberId).
{ok, NPartitions} ->
do_get_status(ClientID, KafkaTopic, SubscriberId, NPartitions);
_ ->
disconnected
end.
%%------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------
%% `brod_group_subscriber' API %% `brod_group_subscriber' API
%%------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------
-spec init(_, consumer_state()) -> {ok, consumer_state()}. -spec init(subscriber_init_info(), consumer_init_data()) -> {ok, consumer_state()}.
init(_GroupData, State) -> init(GroupData, State0) ->
?tp(kafka_consumer_subscriber_init, #{group_data => _GroupData, state => State}), ?tp(kafka_consumer_subscriber_init, #{group_data => GroupData, state => State0}),
#{topic := KafkaTopic} = GroupData,
State = State0#{kafka_topic => KafkaTopic},
{ok, State}. {ok, State}.
-spec handle_message(#kafka_message{}, consumer_state()) -> {ok, commit, consumer_state()}. -spec handle_message(#kafka_message{}, consumer_state()) -> {ok, commit, consumer_state()}.
@ -173,33 +197,29 @@ handle_message(Message, State) ->
do_handle_message(Message, State) -> do_handle_message(Message, State) ->
#{ #{
resource_id := ResourceId,
hookpoint := Hookpoint, hookpoint := Hookpoint,
kafka_topic := KafkaTopic, kafka_topic := KafkaTopic,
mqtt := #{ key_encoding_mode := KeyEncodingMode,
topic := MQTTTopic, resource_id := ResourceId,
payload := MQTTPayload, topic_mapping := TopicMapping,
qos := MQTTQoS value_encoding_mode := ValueEncodingMode
}
} = State, } = State,
#{
mqtt_topic := MQTTTopic,
qos := MQTTQoS,
payload_template := PayloadTemplate
} = maps:get(KafkaTopic, TopicMapping),
FullMessage = #{ FullMessage = #{
headers => maps:from_list(Message#kafka_message.headers),
key => encode(Message#kafka_message.key, KeyEncodingMode),
offset => Message#kafka_message.offset, offset => Message#kafka_message.offset,
key => Message#kafka_message.key, topic => KafkaTopic,
value => Message#kafka_message.value,
ts => Message#kafka_message.ts, ts => Message#kafka_message.ts,
ts_type => Message#kafka_message.ts_type, ts_type => Message#kafka_message.ts_type,
headers => maps:from_list(Message#kafka_message.headers), value => encode(Message#kafka_message.value, ValueEncodingMode)
topic => KafkaTopic
}, },
Payload = Payload = render(FullMessage, PayloadTemplate),
case MQTTPayload of MQTTMessage = emqx_message:make(ResourceId, MQTTQoS, MQTTTopic, Payload),
full_message ->
FullMessage;
message_value ->
Message#kafka_message.value
end,
EncodedPayload = emqx_json:encode(Payload),
MQTTMessage = emqx_message:make(ResourceId, MQTTQoS, MQTTTopic, EncodedPayload),
_ = emqx:publish(MQTTMessage), _ = emqx:publish(MQTTMessage),
emqx:run_hook(Hookpoint, [FullMessage]), emqx:run_hook(Hookpoint, [FullMessage]),
emqx_resource_metrics:received_inc(ResourceId), emqx_resource_metrics:received_inc(ResourceId),
@ -251,21 +271,20 @@ start_consumer(Config, InstanceId, ClientID) ->
max_batch_bytes := MaxBatchBytes, max_batch_bytes := MaxBatchBytes,
max_rejoin_attempts := MaxRejoinAttempts, max_rejoin_attempts := MaxRejoinAttempts,
offset_commit_interval_seconds := OffsetCommitInterval, offset_commit_interval_seconds := OffsetCommitInterval,
offset_reset_policy := OffsetResetPolicy, offset_reset_policy := OffsetResetPolicy
topic := KafkaTopic
}, },
mqtt := #{topic := MQTTTopic, qos := MQTTQoS, payload := MQTTPayload} key_encoding_mode := KeyEncodingMode,
topic_mapping := TopicMapping0,
value_encoding_mode := ValueEncodingMode
} = Config, } = Config,
ok = ensure_consumer_supervisor_started(), ok = ensure_consumer_supervisor_started(),
TopicMapping = convert_topic_mapping(TopicMapping0),
InitialState = #{ InitialState = #{
resource_id => emqx_bridge_resource:resource_id(kafka_consumer, BridgeName), key_encoding_mode => KeyEncodingMode,
mqtt => #{
payload => MQTTPayload,
topic => MQTTTopic,
qos => MQTTQoS
},
hookpoint => Hookpoint, hookpoint => Hookpoint,
kafka_topic => KafkaTopic resource_id => emqx_bridge_resource:resource_id(kafka_consumer, BridgeName),
topic_mapping => TopicMapping,
value_encoding_mode => ValueEncodingMode
}, },
%% note: the group id should be the same for all nodes in the %% note: the group id should be the same for all nodes in the
%% cluster, so that the load gets distributed between all %% cluster, so that the load gets distributed between all
@ -279,11 +298,12 @@ start_consumer(Config, InstanceId, ClientID) ->
{max_rejoin_attempts, MaxRejoinAttempts}, {max_rejoin_attempts, MaxRejoinAttempts},
{offset_commit_interval_seconds, OffsetCommitInterval} {offset_commit_interval_seconds, OffsetCommitInterval}
], ],
KafkaTopics = maps:keys(TopicMapping),
GroupSubscriberConfig = GroupSubscriberConfig =
#{ #{
client => ClientID, client => ClientID,
group_id => GroupID, group_id => GroupID,
topics => [KafkaTopic], topics => KafkaTopics,
cb_module => ?MODULE, cb_module => ?MODULE,
init_data => InitialState, init_data => InitialState,
message_type => message, message_type => message,
@ -304,14 +324,13 @@ start_consumer(Config, InstanceId, ClientID) ->
{ok, #{ {ok, #{
subscriber_id => SubscriberId, subscriber_id => SubscriberId,
kafka_client_id => ClientID, kafka_client_id => ClientID,
kafka_topic => KafkaTopic kafka_topics => KafkaTopics
}}; }};
{error, Reason2} -> {error, Reason2} ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "failed_to_start_kafka_consumer", msg => "failed_to_start_kafka_consumer",
instance_id => InstanceId, instance_id => InstanceId,
kafka_hosts => emqx_bridge_impl_kafka:hosts(BootstrapHosts0), kafka_hosts => emqx_bridge_impl_kafka:hosts(BootstrapHosts0),
kafka_topic => KafkaTopic,
reason => emqx_misc:redact(Reason2) reason => emqx_misc:redact(Reason2)
}), }),
stop_client(ClientID), stop_client(ClientID),
@ -344,6 +363,19 @@ stop_client(ClientID) ->
), ),
ok. ok.
do_get_status(ClientID, [KafkaTopic | RestTopics], SubscriberId) ->
case brod:get_partitions_count(ClientID, KafkaTopic) of
{ok, NPartitions} ->
case do_get_status(ClientID, KafkaTopic, SubscriberId, NPartitions) of
connected -> do_get_status(ClientID, RestTopics, SubscriberId);
disconnected -> disconnected
end;
_ ->
disconnected
end;
do_get_status(_ClientID, _KafkaTopics = [], _SubscriberId) ->
connected.
-spec do_get_status(brod:client_id(), binary(), subscriber_id(), pos_integer()) -> -spec do_get_status(brod:client_id(), binary(), subscriber_id(), pos_integer()) ->
connected | disconnected. connected | disconnected.
do_get_status(ClientID, KafkaTopic, SubscriberId, NPartitions) -> do_get_status(ClientID, KafkaTopic, SubscriberId, NPartitions) ->
@ -424,5 +456,44 @@ make_client_id(InstanceId, KafkaType, KafkaName) ->
probing_brod_consumers probing_brod_consumers
end. end.
convert_topic_mapping(TopicMappingList) ->
lists:foldl(
fun(Fields, Acc) ->
#{
kafka_topic := KafkaTopic,
mqtt_topic := MQTTTopic,
qos := QoS,
payload_template := PayloadTemplate0
} = Fields,
PayloadTemplate = emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate0),
Acc#{
KafkaTopic => #{
payload_template => PayloadTemplate,
mqtt_topic => MQTTTopic,
qos => QoS
}
}
end,
#{},
TopicMappingList
).
render(FullMessage, PayloadTemplate) ->
Opts = #{
return => full_binary,
var_trans => fun
(undefined) ->
<<>>;
(X) ->
emqx_plugin_libs_rule:bin(X)
end
},
emqx_plugin_libs_rule:proc_tmpl(PayloadTemplate, FullMessage, Opts).
encode(Value, force_utf8) ->
Value;
encode(Value, base64) ->
base64:encode(Value).
to_bin(B) when is_binary(B) -> B; to_bin(B) when is_binary(B) -> B;
to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8). to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8).

View File

@ -10,6 +10,7 @@
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("brod/include/brod.hrl"). -include_lib("brod/include/brod.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-import(emqx_common_test_helpers, [on_exit/1]). -import(emqx_common_test_helpers, [on_exit/1]).
@ -53,7 +54,8 @@ sasl_only_tests() ->
only_once_tests() -> only_once_tests() ->
[ [
t_bridge_rule_action_source, t_bridge_rule_action_source,
t_cluster_group t_cluster_group,
t_multiple_topic_mappings
]. ].
init_per_suite(Config) -> init_per_suite(Config) ->
@ -284,6 +286,32 @@ init_per_testcase(TestCase, Config) when
init_per_testcase(t_cluster_group = TestCase, Config0) -> init_per_testcase(t_cluster_group = TestCase, Config0) ->
Config = emqx_misc:merge_opts(Config0, [{num_partitions, 6}]), Config = emqx_misc:merge_opts(Config0, [{num_partitions, 6}]),
common_init_per_testcase(TestCase, Config); common_init_per_testcase(TestCase, Config);
init_per_testcase(t_multiple_topic_mappings = TestCase, Config0) ->
KafkaTopicBase =
<<
(atom_to_binary(TestCase))/binary,
(integer_to_binary(erlang:unique_integer()))/binary
>>,
MQTTTopicBase =
<<"mqtt/", (atom_to_binary(TestCase))/binary,
(integer_to_binary(erlang:unique_integer()))/binary, "/">>,
TopicMapping =
[
#{
kafka_topic => <<KafkaTopicBase/binary, "-1">>,
mqtt_topic => <<MQTTTopicBase/binary, "1">>,
qos => 1,
payload_template => <<"${.}">>
},
#{
kafka_topic => <<KafkaTopicBase/binary, "-2">>,
mqtt_topic => <<MQTTTopicBase/binary, "2">>,
qos => 2,
payload_template => <<"v = ${.value}">>
}
],
Config = [{topic_mapping, TopicMapping} | Config0],
common_init_per_testcase(TestCase, Config);
init_per_testcase(TestCase, Config) -> init_per_testcase(TestCase, Config) ->
common_init_per_testcase(TestCase, Config). common_init_per_testcase(TestCase, Config).
@ -295,23 +323,35 @@ common_init_per_testcase(TestCase, Config0) ->
(atom_to_binary(TestCase))/binary, (atom_to_binary(TestCase))/binary,
(integer_to_binary(erlang:unique_integer()))/binary (integer_to_binary(erlang:unique_integer()))/binary
>>, >>,
Config = [{kafka_topic, KafkaTopic} | Config0], KafkaType = ?config(kafka_type, Config0),
KafkaType = ?config(kafka_type, Config), UniqueNum = integer_to_binary(erlang:unique_integer()),
MQTTTopic = proplists:get_value(mqtt_topic, Config0, <<"mqtt/topic/", UniqueNum/binary>>),
MQTTQoS = proplists:get_value(mqtt_qos, Config0, 0),
DefaultTopicMapping = [
#{
kafka_topic => KafkaTopic,
mqtt_topic => MQTTTopic,
qos => MQTTQoS,
payload_template => <<"${.}">>
}
],
TopicMapping = proplists:get_value(topic_mapping, Config0, DefaultTopicMapping),
Config = [
{kafka_topic, KafkaTopic},
{topic_mapping, TopicMapping}
| Config0
],
{Name, ConfigString, KafkaConfig} = kafka_config( {Name, ConfigString, KafkaConfig} = kafka_config(
TestCase, KafkaType, Config TestCase, KafkaType, Config
), ),
ensure_topic(Config), ensure_topics(Config),
#{ ProducersConfigs = start_producers(TestCase, Config),
producers := Producers,
clientid := KafkaProducerClientId
} = start_producer(TestCase, Config),
ok = snabbkaffe:start_trace(), ok = snabbkaffe:start_trace(),
[ [
{kafka_name, Name}, {kafka_name, Name},
{kafka_config_string, ConfigString}, {kafka_config_string, ConfigString},
{kafka_config, KafkaConfig}, {kafka_config, KafkaConfig},
{kafka_producers, Producers}, {kafka_producers, ProducersConfigs}
{kafka_producer_clientid, KafkaProducerClientId}
| Config | Config
]. ].
@ -323,11 +363,17 @@ end_per_testcase(_Testcase, Config) ->
false -> false ->
ProxyHost = ?config(proxy_host, Config), ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config), ProxyPort = ?config(proxy_port, Config),
Producers = ?config(kafka_producers, Config), ProducersConfigs = ?config(kafka_producers, Config),
KafkaProducerClientId = ?config(kafka_producer_clientid, Config),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
delete_all_bridges(), delete_all_bridges(),
ok = wolff:stop_and_delete_supervised_producers(Producers), #{clientid := KafkaProducerClientId, producers := ProducersMapping} =
ProducersConfigs,
lists:foreach(
fun(Producers) ->
ok = wolff:stop_and_delete_supervised_producers(Producers)
end,
maps:values(ProducersMapping)
),
ok = wolff:stop_and_delete_supervised_client(KafkaProducerClientId), ok = wolff:stop_and_delete_supervised_client(KafkaProducerClientId),
emqx_common_test_helpers:call_janitor(), emqx_common_test_helpers:call_janitor(),
ok = snabbkaffe:stop(), ok = snabbkaffe:stop(),
@ -338,8 +384,8 @@ end_per_testcase(_Testcase, Config) ->
%% Helper fns %% Helper fns
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
start_producer(TestCase, Config) -> start_producers(TestCase, Config) ->
KafkaTopic = ?config(kafka_topic, Config), TopicMapping = ?config(topic_mapping, Config),
KafkaClientId = KafkaClientId =
<<"test-client-", (atom_to_binary(TestCase))/binary, <<"test-client-", (atom_to_binary(TestCase))/binary,
(integer_to_binary(erlang:unique_integer()))/binary>>, (integer_to_binary(erlang:unique_integer()))/binary>>,
@ -381,9 +427,27 @@ start_producer(TestCase, Config) ->
ssl => SSL ssl => SSL
}, },
{ok, Clients} = wolff:ensure_supervised_client(KafkaClientId, Hosts, ClientConfig), {ok, Clients} = wolff:ensure_supervised_client(KafkaClientId, Hosts, ClientConfig),
ProducersData0 =
#{
clients => Clients,
clientid => KafkaClientId,
producers => #{}
},
lists:foldl(
fun(#{kafka_topic := KafkaTopic}, #{producers := ProducersMapping0} = Acc) ->
Producers = do_start_producer(KafkaClientId, KafkaTopic),
ProducersMapping = ProducersMapping0#{KafkaTopic => Producers},
Acc#{producers := ProducersMapping}
end,
ProducersData0,
TopicMapping
).
do_start_producer(KafkaClientId, KafkaTopic) ->
Name = binary_to_atom(<<KafkaTopic/binary, "_test_producer">>),
ProducerConfig = ProducerConfig =
#{ #{
name => test_producer, name => Name,
partitioner => roundrobin, partitioner => roundrobin,
partition_count_refresh_interval_seconds => 1_000, partition_count_refresh_interval_seconds => 1_000,
replayq_max_total_bytes => 10_000, replayq_max_total_bytes => 10_000,
@ -396,14 +460,10 @@ start_producer(TestCase, Config) ->
telemetry_meta_data => #{} telemetry_meta_data => #{}
}, },
{ok, Producers} = wolff:ensure_supervised_producers(KafkaClientId, KafkaTopic, ProducerConfig), {ok, Producers} = wolff:ensure_supervised_producers(KafkaClientId, KafkaTopic, ProducerConfig),
#{ Producers.
producers => Producers,
clients => Clients,
clientid => KafkaClientId
}.
ensure_topic(Config) -> ensure_topics(Config) ->
KafkaTopic = ?config(kafka_topic, Config), TopicMapping = ?config(topic_mapping, Config),
KafkaHost = ?config(kafka_host, Config), KafkaHost = ?config(kafka_host, Config),
KafkaPort = ?config(kafka_port, Config), KafkaPort = ?config(kafka_port, Config),
UseTLS = ?config(use_tls, Config), UseTLS = ?config(use_tls, Config),
@ -418,6 +478,7 @@ ensure_topic(Config) ->
assignments => [], assignments => [],
configs => [] configs => []
} }
|| #{kafka_topic := KafkaTopic} <- TopicMapping
], ],
RequestConfig = #{timeout => 5_000}, RequestConfig = #{timeout => 5_000},
ConnConfig0 = ConnConfig0 =
@ -464,8 +525,16 @@ shared_secret(rig_keytab) ->
filename:join([shared_secret_path(), "rig.keytab"]). filename:join([shared_secret_path(), "rig.keytab"]).
publish(Config, Messages) -> publish(Config, Messages) ->
Producers = ?config(kafka_producers, Config), %% pick the first topic if not specified
ct:pal("publishing: ~p", [Messages]), #{producers := ProducersMapping} = ?config(kafka_producers, Config),
[{KafkaTopic, Producers} | _] = maps:to_list(ProducersMapping),
ct:pal("publishing to ~p:\n ~p", [KafkaTopic, Messages]),
{_Partition, _OffsetReply} = wolff:send_sync(Producers, Messages, 10_000).
publish(Config, KafkaTopic, Messages) ->
#{producers := ProducersMapping} = ?config(kafka_producers, Config),
#{KafkaTopic := Producers} = ProducersMapping,
ct:pal("publishing to ~p:\n ~p", [KafkaTopic, Messages]),
{_Partition, _OffsetReply} = wolff:send_sync(Producers, Messages, 10_000). {_Partition, _OffsetReply} = wolff:send_sync(Producers, Messages, 10_000).
kafka_config(TestCase, _KafkaType, Config) -> kafka_config(TestCase, _KafkaType, Config) ->
@ -480,7 +549,16 @@ kafka_config(TestCase, _KafkaType, Config) ->
>>, >>,
MQTTTopic = proplists:get_value(mqtt_topic, Config, <<"mqtt/topic/", UniqueNum/binary>>), MQTTTopic = proplists:get_value(mqtt_topic, Config, <<"mqtt/topic/", UniqueNum/binary>>),
MQTTQoS = proplists:get_value(mqtt_qos, Config, 0), MQTTQoS = proplists:get_value(mqtt_qos, Config, 0),
MQTTPayload = proplists:get_value(mqtt_payload, Config, full_message), DefaultTopicMapping = [
#{
kafka_topic => KafkaTopic,
mqtt_topic => MQTTTopic,
qos => MQTTQoS,
payload_template => <<"${.}">>
}
],
TopicMapping0 = proplists:get_value(topic_mapping, Config, DefaultTopicMapping),
TopicMappingStr = topic_mapping(TopicMapping0),
ConfigString = ConfigString =
io_lib:format( io_lib:format(
"bridges.kafka_consumer.~s {\n" "bridges.kafka_consumer.~s {\n"
@ -491,18 +569,15 @@ kafka_config(TestCase, _KafkaType, Config) ->
" metadata_request_timeout = 5s\n" " metadata_request_timeout = 5s\n"
"~s" "~s"
" kafka {\n" " kafka {\n"
" topic = ~s\n"
" max_batch_bytes = 896KB\n" " max_batch_bytes = 896KB\n"
" max_rejoin_attempts = 5\n" " max_rejoin_attempts = 5\n"
" offset_commit_interval_seconds = 3\n" " offset_commit_interval_seconds = 3\n"
%% todo: matrix this %% todo: matrix this
" offset_reset_policy = reset_to_latest\n" " offset_reset_policy = reset_to_latest\n"
" }\n" " }\n"
" mqtt {\n" "~s"
" topic = \"~s\"\n" " key_encoding_mode = force_utf8\n"
" qos = ~b\n" " value_encoding_mode = force_utf8\n"
" payload = ~p\n"
" }\n"
" ssl {\n" " ssl {\n"
" enable = ~p\n" " enable = ~p\n"
" verify = verify_none\n" " verify = verify_none\n"
@ -514,15 +589,35 @@ kafka_config(TestCase, _KafkaType, Config) ->
KafkaHost, KafkaHost,
KafkaPort, KafkaPort,
authentication(AuthType), authentication(AuthType),
KafkaTopic, TopicMappingStr,
MQTTTopic,
MQTTQoS,
MQTTPayload,
UseTLS UseTLS
] ]
), ),
{Name, ConfigString, parse_and_check(ConfigString, Name)}. {Name, ConfigString, parse_and_check(ConfigString, Name)}.
topic_mapping(TopicMapping0) ->
Template0 = <<
"{kafka_topic = \"{{ kafka_topic }}\","
" mqtt_topic = \"{{ mqtt_topic }}\","
" qos = {{ qos }},"
" payload_template = \"{{{ payload_template }}}\" }"
>>,
Template = bbmustache:parse_binary(Template0),
Entries =
lists:map(
fun(Params) ->
bbmustache:compile(Template, Params, [{key_type, atom}])
end,
TopicMapping0
),
iolist_to_binary(
[
" topic_mapping = [",
lists:join(<<",\n">>, Entries),
"]\n"
]
).
authentication(Type) when authentication(Type) when
Type =:= scram_sha_256; Type =:= scram_sha_256;
Type =:= scram_sha_512; Type =:= scram_sha_512;
@ -578,6 +673,29 @@ delete_all_bridges() ->
emqx_bridge:list() emqx_bridge:list()
). ).
create_bridge_api(Config) ->
create_bridge_api(Config, _Overrides = #{}).
create_bridge_api(Config, Overrides) ->
TypeBin = ?BRIDGE_TYPE_BIN,
Name = ?config(kafka_name, Config),
KafkaConfig0 = ?config(kafka_config, Config),
KafkaConfig = emqx_map_lib:deep_merge(KafkaConfig0, Overrides),
Params = KafkaConfig#{<<"type">> => TypeBin, <<"name">> => Name},
Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Opts = #{return_all => true},
ct:pal("creating bridge (via http): ~p", [Params]),
Res =
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
{ok, {Status, Headers, Body0}} ->
{ok, {Status, Headers, emqx_json:decode(Body0, [return_maps])}};
Error ->
Error
end,
ct:pal("bridge create result: ~p", [Res]),
Res.
update_bridge_api(Config) -> update_bridge_api(Config) ->
update_bridge_api(Config, _Overrides = #{}). update_bridge_api(Config, _Overrides = #{}).
@ -702,15 +820,24 @@ wait_until_subscribers_are_ready(N, Timeout) ->
%% flaky about when they decide truly consuming the messages... %% flaky about when they decide truly consuming the messages...
%% `Period' should be greater than the `sleep_timeout' of the consumer %% `Period' should be greater than the `sleep_timeout' of the consumer
%% (default 1 s). %% (default 1 s).
ping_until_healthy(_Config, _Period, Timeout) when Timeout =< 0 ->
ct:fail("kafka subscriber did not stabilize!");
ping_until_healthy(Config, Period, Timeout) -> ping_until_healthy(Config, Period, Timeout) ->
#{producers := ProducersMapping} = ?config(kafka_producers, Config),
[KafkaTopic | _] = maps:keys(ProducersMapping),
ping_until_healthy(Config, KafkaTopic, Period, Timeout).
ping_until_healthy(_Config, _KafkaTopic, _Period, Timeout) when Timeout =< 0 ->
ct:fail("kafka subscriber did not stabilize!");
ping_until_healthy(Config, KafkaTopic, Period, Timeout) ->
TimeA = erlang:monotonic_time(millisecond), TimeA = erlang:monotonic_time(millisecond),
Payload = emqx_guid:to_hexstr(emqx_guid:gen()), Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
publish(Config, [#{key => <<"probing">>, value => Payload}]), publish(Config, KafkaTopic, [#{key => <<"probing">>, value => Payload}]),
Res = Res =
?block_until( ?block_until(
#{?snk_kind := kafka_consumer_handle_message, ?snk_span := {complete, _}}, #{
?snk_kind := kafka_consumer_handle_message,
?snk_span := {complete, _},
message := #kafka_message{value = Payload}
},
Period Period
), ),
case Res of case Res of
@ -930,6 +1057,132 @@ t_start_and_consume_ok(Config) ->
), ),
ok. ok.
t_multiple_topic_mappings(Config) ->
TopicMapping = ?config(topic_mapping, Config),
MQTTTopics = [MQTTTopic || #{mqtt_topic := MQTTTopic} <- TopicMapping],
KafkaTopics = [KafkaTopic || #{kafka_topic := KafkaTopic} <- TopicMapping],
NumMQTTTopics = length(MQTTTopics),
NPartitions = ?config(num_partitions, Config),
ResourceId = resource_id(Config),
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
?check_trace(
begin
?assertMatch(
{ok, {{_, 201, _}, _, _}},
create_bridge_api(Config)
),
wait_until_subscribers_are_ready(NPartitions, 40_000),
lists:foreach(
fun(KafkaTopic) ->
ping_until_healthy(Config, KafkaTopic, _Period = 1_500, _Timeout = 24_000)
end,
KafkaTopics
),
{ok, C} = emqtt:start_link([{proto_ver, v5}]),
on_exit(fun() -> emqtt:stop(C) end),
{ok, _} = emqtt:connect(C),
lists:foreach(
fun(MQTTTopic) ->
%% we use the hightest QoS so that we can check what
%% the subscription was.
QoS2Granted = 2,
{ok, _, [QoS2Granted]} = emqtt:subscribe(C, MQTTTopic, ?QOS_2)
end,
MQTTTopics
),
{ok, SRef0} =
snabbkaffe:subscribe(
?match_event(#{
?snk_kind := kafka_consumer_handle_message, ?snk_span := {complete, _}
}),
NumMQTTTopics,
_Timeout0 = 20_000
),
lists:foreach(
fun(KafkaTopic) ->
publish(Config, KafkaTopic, [
#{
key => <<"mykey">>,
value => Payload,
headers => [{<<"hkey">>, <<"hvalue">>}]
}
])
end,
KafkaTopics
),
{ok, _} = snabbkaffe:receive_events(SRef0),
%% Check that the bridge probe API doesn't leak atoms.
ProbeRes = probe_bridge_api(Config),
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes),
AtomsBefore = erlang:system_info(atom_count),
%% Probe again; shouldn't have created more atoms.
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes),
AtomsAfter = erlang:system_info(atom_count),
?assertEqual(AtomsBefore, AtomsAfter),
ok
end,
fun(Trace) ->
%% two messages processed with begin/end events
?assertMatch([_, _, _, _ | _], ?of_kind(kafka_consumer_handle_message, Trace)),
Published = receive_published(#{n => NumMQTTTopics}),
lists:foreach(
fun(
#{
mqtt_topic := MQTTTopic,
qos := MQTTQoS
}
) ->
[Msg] = [
Msg
|| Msg = #{topic := T} <- Published,
T =:= MQTTTopic
],
?assertMatch(
#{
qos := MQTTQoS,
topic := MQTTTopic,
payload := _
},
Msg
)
end,
TopicMapping
),
%% check that we observed the different payload templates
%% as configured.
Payloads =
lists:sort([
case emqx_json:safe_decode(P, [return_maps]) of
{ok, Decoded} -> Decoded;
{error, _} -> P
end
|| #{payload := P} <- Published
]),
?assertMatch(
[
#{
<<"headers">> := #{<<"hkey">> := <<"hvalue">>},
<<"key">> := <<"mykey">>,
<<"offset">> := Offset,
<<"topic">> := KafkaTopic,
<<"ts">> := TS,
<<"ts_type">> := <<"create">>,
<<"value">> := Payload
},
<<"v = ", Payload/binary>>
] when is_integer(Offset) andalso is_integer(TS) andalso is_binary(KafkaTopic),
Payloads
),
?assertEqual(2, emqx_resource_metrics:received_get(ResourceId)),
ok
end
),
ok.
t_on_get_status(Config) -> t_on_get_status(Config) ->
case proplists:get_bool(skip_does_not_apply, Config) of case proplists:get_bool(skip_does_not_apply, Config) of
true -> true ->