feat: make kafka producer freely dynamic

This commit is contained in:
Thales Macedo Garitezi 2024-07-26 14:25:20 -03:00
parent df1f4fad70
commit 4e0742c66f
8 changed files with 73 additions and 231 deletions

View File

@ -382,46 +382,31 @@ t_multiple_actions_sharing_topic(Config) ->
ActionConfig0,
#{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
),
ok = emqx_bridge_v2_kafka_producer_SUITE:t_multiple_actions_sharing_topic(
[
{type, ?BRIDGE_TYPE_BIN},
{connector_name, ?config(connector_name, Config)},
{connector_config, ?config(connector_config, Config)},
{action_config, ActionConfig}
]
),
ok =
emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME(
[
{type, ?BRIDGE_TYPE_BIN},
{connector_name, ?config(connector_name, Config)},
{connector_config, ?config(connector_config, Config)},
{action_config, ActionConfig}
]
),
ok.
t_pre_configured_topics(Config) ->
t_dynamic_topics(Config) ->
ActionConfig0 = ?config(action_config, Config),
ActionConfig =
emqx_utils_maps:deep_merge(
ActionConfig0,
#{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
),
ok = emqx_bridge_v2_kafka_producer_SUITE:t_pre_configured_topics(
[
{type, ?BRIDGE_TYPE_BIN},
{connector_name, ?config(connector_name, Config)},
{connector_config, ?config(connector_config, Config)},
{action_config, ActionConfig}
]
),
ok.
t_templated_topic_and_no_pre_configured_topics(Config) ->
ActionConfig0 = ?config(action_config, Config),
ActionConfig =
emqx_utils_maps:deep_merge(
ActionConfig0,
#{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
ok =
emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME(
[
{type, ?BRIDGE_TYPE_BIN},
{connector_name, ?config(connector_name, Config)},
{connector_config, ?config(connector_config, Config)},
{action_config, ActionConfig}
]
),
ok = emqx_bridge_v2_kafka_producer_SUITE:t_templated_topic_and_no_pre_configured_topics(
[
{type, ?BRIDGE_TYPE_BIN},
{connector_name, ?config(connector_name, Config)},
{connector_config, ?config(connector_config, Config)},
{action_config, ActionConfig}
]
),
ok.

View File

@ -391,46 +391,31 @@ t_multiple_actions_sharing_topic(Config) ->
ActionConfig0,
#{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
),
ok = emqx_bridge_v2_kafka_producer_SUITE:t_multiple_actions_sharing_topic(
[
{type, ?ACTION_TYPE_BIN},
{connector_name, ?config(connector_name, Config)},
{connector_config, ?config(connector_config, Config)},
{action_config, ActionConfig}
]
),
ok =
emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME(
[
{type, ?ACTION_TYPE_BIN},
{connector_name, ?config(connector_name, Config)},
{connector_config, ?config(connector_config, Config)},
{action_config, ActionConfig}
]
),
ok.
t_pre_configured_topics(Config) ->
t_dynamic_topics(Config) ->
ActionConfig0 = ?config(action_config, Config),
ActionConfig =
emqx_utils_maps:deep_merge(
ActionConfig0,
#{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
),
ok = emqx_bridge_v2_kafka_producer_SUITE:t_pre_configured_topics(
[
{type, ?ACTION_TYPE_BIN},
{connector_name, ?config(connector_name, Config)},
{connector_config, ?config(connector_config, Config)},
{action_config, ActionConfig}
]
),
ok.
t_templated_topic_and_no_pre_configured_topics(Config) ->
ActionConfig0 = ?config(action_config, Config),
ActionConfig =
emqx_utils_maps:deep_merge(
ActionConfig0,
#{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
ok =
emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME(
[
{type, ?ACTION_TYPE_BIN},
{connector_name, ?config(connector_name, Config)},
{connector_config, ?config(connector_config, Config)},
{action_config, ActionConfig}
]
),
ok = emqx_bridge_v2_kafka_producer_SUITE:t_templated_topic_and_no_pre_configured_topics(
[
{type, ?ACTION_TYPE_BIN},
{connector_name, ?config(connector_name, Config)},
{connector_config, ?config(connector_config, Config)},
{action_config, ActionConfig}
]
),
ok.

View File

@ -307,10 +307,6 @@ fields(kafka_producer_action) ->
{tags, emqx_schema:tags_schema()},
{description, emqx_schema:description_schema()}
] ++ producer_opts(action);
fields(pre_configured_topic) ->
[
{topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})}
];
fields(kafka_consumer) ->
connector_config_fields() ++ fields(consumer_opts);
fields(ssl_client_opts) ->
@ -396,14 +392,6 @@ fields(v1_producer_kafka_opts) ->
fields(producer_kafka_opts) ->
[
{topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})},
{pre_configured_topics,
mk(
hoconsc:array(ref(pre_configured_topic)),
#{
default => [],
desc => ?DESC("producer_pre_configured_topics")
}
)},
{message, mk(ref(kafka_message), #{required => false, desc => ?DESC(kafka_message)})},
{max_batch_bytes,
mk(emqx_schema:bytesize(), #{default => <<"896KB">>, desc => ?DESC(max_batch_bytes)})},

View File

@ -3,6 +3,8 @@
%%--------------------------------------------------------------------
-module(emqx_bridge_kafka_impl_producer).
-feature(maybe_expr, enable).
-behaviour(emqx_resource).
-include_lib("emqx_resource/include/emqx_resource.hrl").
@ -132,37 +134,22 @@ create_producers_for_bridge_v2(
) ->
#{
message := MessageTemplate,
pre_configured_topics := PreConfiguredTopics0,
topic := KafkaTopic0,
sync_query_timeout := SyncQueryTimeout
} = KafkaConfig,
TopicTemplate = {TopicType, KafkaTopic} = maybe_preproc_topic(KafkaTopic0),
PreConfiguredTopics = [T || #{topic := T} <- PreConfiguredTopics0],
KafkaTopics0 =
TopicTemplate = {TopicType, TopicOrTemplate} = maybe_preproc_topic(KafkaTopic0),
MKafkaTopic =
case TopicType of
fixed ->
[KafkaTopic | PreConfiguredTopics];
dynamic ->
PreConfiguredTopics
fixed -> TopicOrTemplate;
dynamic -> dynamic
end,
case KafkaTopics0 of
[] ->
throw(<<
"Either the Kafka topic must be fixed (not a template),"
" or at least one pre-defined topic must be set."
>>);
_ ->
ok
end,
KafkaTopics = lists:map(fun bin/1, KafkaTopics0),
KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)),
KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])),
KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none),
MaxPartitions = maps:get(partitions_limit, KafkaConfig, all_partitions),
#{name := BridgeName} = emqx_bridge_v2:parse_id(ActionResId),
IsDryRun = emqx_resource:is_dry_run(ActionResId),
[AKafkaTopic | _] = KafkaTopics,
ok = check_topic_and_leader_connections(ActionResId, ClientId, AKafkaTopic, MaxPartitions),
ok = check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions),
WolffProducerConfig = producers_config(
BridgeType, BridgeName, KafkaConfig, IsDryRun, ActionResId
),
@ -179,7 +166,7 @@ create_producers_for_bridge_v2(
message_template => compile_message_template(MessageTemplate),
kafka_client_id => ClientId,
topic_template => TopicTemplate,
pre_configured_topics => KafkaTopics,
topic => MKafkaTopic,
producers => Producers,
resource_id => ActionResId,
connector_resource_id => ConnResId,
@ -195,7 +182,7 @@ create_producers_for_bridge_v2(
msg => "failed_to_start_kafka_producer",
instance_id => ConnResId,
kafka_client_id => ClientId,
kafka_topic => KafkaTopic,
kafka_topic => MKafkaTopic,
reason => Reason2
}),
throw(
@ -326,7 +313,6 @@ on_query(
message_template := MessageTemplate,
topic_template := TopicTemplate,
producers := Producers,
pre_configured_topics := PreConfiguredTopics,
sync_query_timeout := SyncTimeout,
headers_tokens := KafkaHeadersTokens,
ext_headers_tokens := KafkaExtHeadersTokens,
@ -339,12 +325,6 @@ on_query(
},
try
KafkaTopic = render_topic(TopicTemplate, Message),
case lists:member(KafkaTopic, PreConfiguredTopics) of
false ->
throw({unknown_topic, KafkaTopic});
true ->
ok
end,
KafkaMessage = render_message(MessageTemplate, KafkaHeaders, Message),
?tp(
emqx_bridge_kafka_impl_producer_sync_query,
@ -358,7 +338,7 @@ on_query(
throw:bad_topic ->
?tp("kafka_producer_failed_to_render_topic", #{}),
{error, {unrecoverable_error, failed_to_render_topic}};
throw:{unknown_topic, Topic} ->
throw:#{cause := unknown_topic_or_partition, topic := Topic} ->
?tp("kafka_producer_resolved_to_unknown_topic", #{}),
{error, {unrecoverable_error, {resolved_to_unknown_topic, Topic}}};
throw:#{cause := invalid_partition_count, count := Count} ->
@ -408,7 +388,6 @@ on_query_async(
message_template := Template,
topic_template := TopicTemplate,
producers := Producers,
pre_configured_topics := PreConfiguredTopics,
headers_tokens := KafkaHeadersTokens,
ext_headers_tokens := KafkaExtHeadersTokens,
headers_val_encode_mode := KafkaHeadersValEncodeMode
@ -420,12 +399,6 @@ on_query_async(
},
try
KafkaTopic = render_topic(TopicTemplate, Message),
case lists:member(KafkaTopic, PreConfiguredTopics) of
false ->
throw({unknown_topic, KafkaTopic});
true ->
ok
end,
KafkaMessage = render_message(Template, KafkaHeaders, Message),
?tp(
emqx_bridge_kafka_impl_producer_async_query,
@ -439,7 +412,7 @@ on_query_async(
throw:bad_topic ->
?tp("kafka_producer_failed_to_render_topic", #{}),
{error, {unrecoverable_error, failed_to_render_topic}};
throw:{unknown_topic, Topic} ->
throw:#{cause := unknown_topic_or_partition, topic := Topic} ->
?tp("kafka_producer_resolved_to_unknown_topic", #{}),
{error, {unrecoverable_error, {resolved_to_unknown_topic, Topic}}};
throw:#{cause := invalid_partition_count, count := Count} ->
@ -618,12 +591,11 @@ on_get_channel_status(
%% connector, thus potentially dropping data held in wolff producer's replayq. The
%% only exception is if the topic does not exist ("unhealthy target").
#{
pre_configured_topics := PreConfiguredTopics,
topic := MKafkaTopic,
partitions_limit := MaxPartitions
} = maps:get(ActionResId, Channels),
[KafkaTopic | _] = PreConfiguredTopics,
try
ok = check_topic_and_leader_connections(ActionResId, ClientId, KafkaTopic, MaxPartitions),
ok = check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions),
?status_connected
catch
throw:{unhealthy_target, Msg} ->
@ -632,22 +604,29 @@ on_get_channel_status(
{?status_connecting, {K, E}}
end.
check_topic_and_leader_connections(ActionResId, ClientId, KafkaTopic, MaxPartitions) ->
check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions) ->
case wolff_client_sup:find_client(ClientId) of
{ok, Pid} ->
ok = check_topic_status(ClientId, Pid, KafkaTopic),
ok = check_if_healthy_leaders(ActionResId, ClientId, Pid, KafkaTopic, MaxPartitions);
maybe
true ?= is_binary(MKafkaTopic),
ok = check_topic_status(ClientId, Pid, MKafkaTopic),
ok = check_if_healthy_leaders(
ActionResId, ClientId, Pid, MKafkaTopic, MaxPartitions
)
else
false -> ok
end;
{error, #{reason := no_such_client}} ->
throw(#{
reason => cannot_find_kafka_client,
kafka_client => ClientId,
kafka_topic => KafkaTopic
kafka_topic => MKafkaTopic
});
{error, #{reason := client_supervisor_not_initialized}} ->
throw(#{
reason => restarting,
kafka_client => ClientId,
kafka_topic => KafkaTopic
kafka_topic => MKafkaTopic
})
end.

View File

@ -915,17 +915,17 @@ t_multiple_actions_sharing_topic(Config) ->
),
ok.
%% Smoke tests for using a templated topic and a list of pre-configured kafka topics.
t_pre_configured_topics(Config) ->
%% Smoke tests for using a templated topic and adynamic kafka topics.
t_dynamic_topics(Config) ->
Type = proplists:get_value(type, Config, ?TYPE),
ConnectorName = proplists:get_value(connector_name, Config, <<"c">>),
ConnectorConfig = proplists:get_value(connector_config, Config, connector_config()),
ActionName = <<"pre_configured_topics">>,
ActionName = <<"dynamic_topics">>,
ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)),
PreConfigureTopic1 = <<"pct1">>,
PreConfigureTopic2 = <<"pct2">>,
ensure_kafka_topic(PreConfigureTopic1),
ensure_kafka_topic(PreConfigureTopic2),
PreConfiguredTopic1 = <<"pct1">>,
PreConfiguredTopic2 = <<"pct2">>,
ensure_kafka_topic(PreConfiguredTopic1),
ensure_kafka_topic(PreConfiguredTopic2),
ActionConfig = emqx_bridge_v2_testlib:parse_and_check(
action,
Type,
@ -938,11 +938,7 @@ t_pre_configured_topics(Config) ->
<<"message">> => #{
<<"key">> => <<"${.clientid}">>,
<<"value">> => <<"${.payload.p}">>
},
<<"pre_configured_topics">> => [
#{<<"topic">> => PreConfigureTopic1},
#{<<"topic">> => PreConfigureTopic2}
]
}
}
}
)
@ -1001,13 +997,13 @@ t_pre_configured_topics(Config) ->
{ok, C} = emqtt:start_link(#{}),
{ok, _} = emqtt:connect(C),
Payload = fun(Map) -> emqx_utils_json:encode(Map) end,
Offset1 = resolve_kafka_offset(PreConfigureTopic1),
Offset2 = resolve_kafka_offset(PreConfigureTopic2),
Offset1 = resolve_kafka_offset(PreConfiguredTopic1),
Offset2 = resolve_kafka_offset(PreConfiguredTopic2),
{ok, _} = emqtt:publish(C, RuleTopic, Payload(#{n => 1, p => <<"p1">>}), [{qos, 1}]),
{ok, _} = emqtt:publish(C, RuleTopic, Payload(#{n => 2, p => <<"p2">>}), [{qos, 1}]),
check_kafka_message_payload(PreConfigureTopic1, Offset1, <<"p1">>),
check_kafka_message_payload(PreConfigureTopic2, Offset2, <<"p2">>),
check_kafka_message_payload(PreConfiguredTopic1, Offset1, <<"p1">>),
check_kafka_message_payload(PreConfiguredTopic2, Offset2, <<"p2">>),
ActionId = emqx_bridge_v2:id(Type, ActionName),
?assertEqual(2, emqx_resource_metrics:matched_get(ActionId)),
@ -1046,64 +1042,3 @@ t_pre_configured_topics(Config) ->
[]
),
ok.
%% Checks that creating an action with templated topic and no pre-configured kafka topics
%% throws.
t_templated_topic_and_no_pre_configured_topics(Config) ->
Type = proplists:get_value(type, Config, ?TYPE),
ConnectorName = proplists:get_value(connector_name, Config, <<"c">>),
ConnectorConfig = proplists:get_value(connector_config, Config, connector_config()),
ActionName = <<"bad_pre_configured_topics">>,
ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)),
ActionConfig = emqx_bridge_v2_testlib:parse_and_check(
action,
Type,
ActionName,
emqx_utils_maps:deep_merge(
ActionConfig1,
#{
<<"parameters">> => #{
<<"topic">> => <<"pct${.payload.n}">>,
<<"pre_configured_topics">> => []
}
}
)
),
?check_trace(
#{timetrap => 7_000},
begin
ConnectorParams = [
{connector_config, ConnectorConfig},
{connector_name, ConnectorName},
{connector_type, Type}
],
ActionParams = [
{action_config, ActionConfig},
{action_name, ActionName},
{action_type, Type}
],
{ok, {{_, 201, _}, _, #{}}} =
emqx_bridge_v2_testlib:create_connector_api(ConnectorParams),
{ok, {{_, 201, _}, _, #{}}} =
emqx_bridge_v2_testlib:create_action_api(ActionParams),
?assertMatch(
{ok,
{{_, 200, _}, _, #{
<<"status_reason">> :=
<<
"Either the Kafka topic must be fixed (not a template),"
" or at least one pre-defined topic must be set."
>>,
<<"status">> := <<"disconnected">>,
<<"node_status">> := [#{<<"status">> := <<"disconnected">>}]
}}},
emqx_bridge_v2_testlib:get_bridge_api(Type, ActionName)
),
ok
end,
[]
),
ok.

View File

@ -350,14 +350,4 @@ Setting this to a value which is greater than the total number of partitions in
partitions_limit.label:
"""Max Partitions"""
producer_pre_configured_topics.label:
"""Pre-configured Event Hubs"""
producer_pre_configured_topics.desc:
"""A list of pre-configured event hubs to be used when using templates to define outgoing topics. If the topic template fails to resolve to a value due to missing data in the incoming message, or if it resolves to a topic that is not contained in this list, then publishing will fail."""
pre_configured_topic.label:
"""Event Hubs Name"""
pre_configured_topic.desc:
"""Event Hubs name"""
}

View File

@ -350,14 +350,4 @@ server_name_indication.desc:
server_name_indication.label:
"""SNI"""
producer_pre_configured_topics.label:
"""Pre-configured Topics"""
producer_pre_configured_topics.desc:
"""A list of pre-configured topics to be used when using templates to define outgoing topics. If the topic template fails to resolve to a value due to missing data in the incoming message, or if it resolves to a topic that is not contained in this list, then publishing will fail."""
pre_configured_topic.label:
"""Kafka Topic Name"""
pre_configured_topic.desc:
"""Kafka topic name"""
}

View File

@ -446,14 +446,4 @@ server_name_indication.desc:
server_name_indication.label:
"""SNI"""
producer_pre_configured_topics.label:
"""Pre-configured Topics"""
producer_pre_configured_topics.desc:
"""A list of pre-configured topics to be used when using templates to define outgoing topics. If the topic template fails to resolve to a value due to missing data in the incoming message, or if it resolves to a topic that is not contained in this list, then publishing will fail."""
pre_configured_topic.label:
"""Kafka Topic Name"""
pre_configured_topic.desc:
"""Kafka topic name"""
}