diff --git a/apps/emqx_bridge_azure_event_hub/mix.exs b/apps/emqx_bridge_azure_event_hub/mix.exs
index 42edddbbe..8f5068d0e 100644
--- a/apps/emqx_bridge_azure_event_hub/mix.exs
+++ b/apps/emqx_bridge_azure_event_hub/mix.exs
@@ -23,7 +23,7 @@ defmodule EMQXBridgeAzureEventHub.MixProject do
def deps() do
[
- {:wolff, github: "kafka4beam/wolff", tag: "2.0.0"},
+ {:wolff, github: "kafka4beam/wolff", tag: "3.0.2"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
{:brod, github: "kafka4beam/brod", tag: "3.18.0"},
diff --git a/apps/emqx_bridge_azure_event_hub/rebar.config b/apps/emqx_bridge_azure_event_hub/rebar.config
index 76ea7fa6c..c8be2a6a3 100644
--- a/apps/emqx_bridge_azure_event_hub/rebar.config
+++ b/apps/emqx_bridge_azure_event_hub/rebar.config
@@ -2,7 +2,7 @@
{erl_opts, [debug_info]}.
{deps, [
- {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "2.0.0"}}},
+ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "3.0.2"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},
diff --git a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl
index 661b8819c..f2a06cf65 100644
--- a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl
+++ b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl
@@ -382,12 +382,31 @@ t_multiple_actions_sharing_topic(Config) ->
ActionConfig0,
#{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
),
- ok = emqx_bridge_v2_kafka_producer_SUITE:t_multiple_actions_sharing_topic(
- [
- {type, ?BRIDGE_TYPE_BIN},
- {connector_name, ?config(connector_name, Config)},
- {connector_config, ?config(connector_config, Config)},
- {action_config, ActionConfig}
- ]
- ),
+ ok =
+ emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME(
+ [
+ {type, ?BRIDGE_TYPE_BIN},
+ {connector_name, ?config(connector_name, Config)},
+ {connector_config, ?config(connector_config, Config)},
+ {action_config, ActionConfig}
+ ]
+ ),
+ ok.
+
+t_dynamic_topics(Config) ->
+ ActionConfig0 = ?config(action_config, Config),
+ ActionConfig =
+ emqx_utils_maps:deep_merge(
+ ActionConfig0,
+ #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
+ ),
+ ok =
+ emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME(
+ [
+ {type, ?BRIDGE_TYPE_BIN},
+ {connector_name, ?config(connector_name, Config)},
+ {connector_config, ?config(connector_config, Config)},
+ {action_config, ActionConfig}
+ ]
+ ),
ok.
diff --git a/apps/emqx_bridge_confluent/mix.exs b/apps/emqx_bridge_confluent/mix.exs
index 46cbe9a02..134e924fc 100644
--- a/apps/emqx_bridge_confluent/mix.exs
+++ b/apps/emqx_bridge_confluent/mix.exs
@@ -23,7 +23,7 @@ defmodule EMQXBridgeConfluent.MixProject do
def deps() do
[
- {:wolff, github: "kafka4beam/wolff", tag: "2.0.0"},
+ {:wolff, github: "kafka4beam/wolff", tag: "3.0.2"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
{:brod, github: "kafka4beam/brod", tag: "3.18.0"},
diff --git a/apps/emqx_bridge_confluent/rebar.config b/apps/emqx_bridge_confluent/rebar.config
index 1a91f501d..786b1cf82 100644
--- a/apps/emqx_bridge_confluent/rebar.config
+++ b/apps/emqx_bridge_confluent/rebar.config
@@ -2,7 +2,7 @@
{erl_opts, [debug_info]}.
{deps, [
- {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "2.0.0"}}},
+ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "3.0.2"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},
diff --git a/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl b/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl
index 0b3a22a99..f10e88463 100644
--- a/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl
+++ b/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl
@@ -391,12 +391,31 @@ t_multiple_actions_sharing_topic(Config) ->
ActionConfig0,
#{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
),
- ok = emqx_bridge_v2_kafka_producer_SUITE:t_multiple_actions_sharing_topic(
- [
- {type, ?ACTION_TYPE_BIN},
- {connector_name, ?config(connector_name, Config)},
- {connector_config, ?config(connector_config, Config)},
- {action_config, ActionConfig}
- ]
- ),
+ ok =
+ emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME(
+ [
+ {type, ?ACTION_TYPE_BIN},
+ {connector_name, ?config(connector_name, Config)},
+ {connector_config, ?config(connector_config, Config)},
+ {action_config, ActionConfig}
+ ]
+ ),
+ ok.
+
+t_dynamic_topics(Config) ->
+ ActionConfig0 = ?config(action_config, Config),
+ ActionConfig =
+ emqx_utils_maps:deep_merge(
+ ActionConfig0,
+ #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
+ ),
+ ok =
+ emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME(
+ [
+ {type, ?ACTION_TYPE_BIN},
+ {connector_name, ?config(connector_name, Config)},
+ {connector_config, ?config(connector_config, Config)},
+ {action_config, ActionConfig}
+ ]
+ ),
ok.
diff --git a/apps/emqx_bridge_kafka/mix.exs b/apps/emqx_bridge_kafka/mix.exs
index b74b1fdd0..a1a59cb08 100644
--- a/apps/emqx_bridge_kafka/mix.exs
+++ b/apps/emqx_bridge_kafka/mix.exs
@@ -23,7 +23,7 @@ defmodule EMQXBridgeKafka.MixProject do
def deps() do
[
- {:wolff, github: "kafka4beam/wolff", tag: "2.0.0"},
+ {:wolff, github: "kafka4beam/wolff", tag: "3.0.2"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
{:brod, github: "kafka4beam/brod", tag: "3.18.0"},
diff --git a/apps/emqx_bridge_kafka/rebar.config b/apps/emqx_bridge_kafka/rebar.config
index b89c9190f..77d9b95ef 100644
--- a/apps/emqx_bridge_kafka/rebar.config
+++ b/apps/emqx_bridge_kafka/rebar.config
@@ -2,7 +2,7 @@
{erl_opts, [debug_info]}.
{deps, [
- {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "2.0.0"}}},
+ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "3.0.2"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},
diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl
index 83bc33266..254e84036 100644
--- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl
+++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl
@@ -295,6 +295,7 @@ fields("config_producer") ->
fields("config_consumer") ->
fields(kafka_consumer);
fields(kafka_producer) ->
+ %% Schema used by bridges V1.
connector_config_fields() ++ producer_opts(v1);
fields(kafka_producer_action) ->
[
@@ -364,9 +365,33 @@ fields(socket_opts) ->
validator => fun emqx_schema:validate_tcp_keepalive/1
})}
];
+fields(v1_producer_kafka_opts) ->
+ OldSchemaFields =
+ [
+ topic,
+ message,
+ max_batch_bytes,
+ compression,
+ partition_strategy,
+ required_acks,
+ kafka_headers,
+ kafka_ext_headers,
+ kafka_header_value_encode_mode,
+ partition_count_refresh_interval,
+ partitions_limit,
+ max_inflight,
+ buffer,
+ query_mode,
+ sync_query_timeout
+ ],
+ Fields = fields(producer_kafka_opts),
+ lists:filter(
+ fun({K, _V}) -> lists:member(K, OldSchemaFields) end,
+ Fields
+ );
fields(producer_kafka_opts) ->
[
- {topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})},
+ {topic, mk(emqx_schema:template(), #{required => true, desc => ?DESC(kafka_topic)})},
{message, mk(ref(kafka_message), #{required => false, desc => ?DESC(kafka_message)})},
{max_batch_bytes,
mk(emqx_schema:bytesize(), #{default => <<"896KB">>, desc => ?DESC(max_batch_bytes)})},
@@ -675,15 +700,15 @@ resource_opts() ->
%% However we need to keep it backward compatible for generated schema json (version 0.1.0)
%% since schema is data for the 'schemas' API.
parameters_field(ActionOrBridgeV1) ->
- {Name, Alias} =
+ {Name, Alias, Ref} =
case ActionOrBridgeV1 of
v1 ->
- {kafka, parameters};
+ {kafka, parameters, v1_producer_kafka_opts};
action ->
- {parameters, kafka}
+ {parameters, kafka, producer_kafka_opts}
end,
{Name,
- mk(ref(producer_kafka_opts), #{
+ mk(ref(Ref), #{
required => true,
aliases => [Alias],
desc => ?DESC(producer_kafka_opts),
diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl
index 6d88a329e..fb7fce63c 100644
--- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl
+++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl
@@ -3,6 +3,8 @@
%%--------------------------------------------------------------------
-module(emqx_bridge_kafka_impl_producer).
+-feature(maybe_expr, enable).
+
-behaviour(emqx_resource).
-include_lib("emqx_resource/include/emqx_resource.hrl").
@@ -122,8 +124,8 @@ on_add_channel(
{ok, NewState}.
create_producers_for_bridge_v2(
- InstId,
- BridgeV2Id,
+ ConnResId,
+ ActionResId,
ClientId,
#{
bridge_type := BridgeType,
@@ -132,33 +134,42 @@ create_producers_for_bridge_v2(
) ->
#{
message := MessageTemplate,
- topic := KafkaTopic,
+ topic := KafkaTopic0,
sync_query_timeout := SyncQueryTimeout
} = KafkaConfig,
+ TopicTemplate = {TopicType, TopicOrTemplate} = maybe_preproc_topic(KafkaTopic0),
+ MKafkaTopic =
+ case TopicType of
+ fixed -> TopicOrTemplate;
+ dynamic -> dynamic
+ end,
KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)),
KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])),
KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none),
MaxPartitions = maps:get(partitions_limit, KafkaConfig, all_partitions),
- #{name := BridgeName} = emqx_bridge_v2:parse_id(BridgeV2Id),
- IsDryRun = emqx_resource:is_dry_run(BridgeV2Id),
- ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions),
+ #{name := BridgeName} = emqx_bridge_v2:parse_id(ActionResId),
+ IsDryRun = emqx_resource:is_dry_run(ActionResId),
+ ok = check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions),
WolffProducerConfig = producers_config(
- BridgeType, BridgeName, KafkaConfig, IsDryRun, BridgeV2Id
+ BridgeType, BridgeName, KafkaConfig, IsDryRun, ActionResId
),
- case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of
+ case wolff:ensure_supervised_dynamic_producers(ClientId, WolffProducerConfig) of
{ok, Producers} ->
- ok = emqx_resource:allocate_resource(InstId, {?kafka_producers, BridgeV2Id}, Producers),
ok = emqx_resource:allocate_resource(
- InstId, {?kafka_telemetry_id, BridgeV2Id}, BridgeV2Id
+ ConnResId, {?kafka_producers, ActionResId}, Producers
),
- _ = maybe_install_wolff_telemetry_handlers(BridgeV2Id),
+ ok = emqx_resource:allocate_resource(
+ ConnResId, {?kafka_telemetry_id, ActionResId}, ActionResId
+ ),
+ _ = maybe_install_wolff_telemetry_handlers(ActionResId),
{ok, #{
message_template => compile_message_template(MessageTemplate),
kafka_client_id => ClientId,
- kafka_topic => KafkaTopic,
+ topic_template => TopicTemplate,
+ topic => MKafkaTopic,
producers => Producers,
- resource_id => BridgeV2Id,
- connector_resource_id => InstId,
+ resource_id => ActionResId,
+ connector_resource_id => ConnResId,
sync_query_timeout => SyncQueryTimeout,
kafka_config => KafkaConfig,
headers_tokens => KafkaHeadersTokens,
@@ -169,9 +180,9 @@ create_producers_for_bridge_v2(
{error, Reason2} ->
?SLOG(error, #{
msg => "failed_to_start_kafka_producer",
- instance_id => InstId,
+ instance_id => ConnResId,
kafka_client_id => ClientId,
- kafka_topic => KafkaTopic,
+ kafka_topic => MKafkaTopic,
reason => Reason2
}),
throw(
@@ -264,7 +275,9 @@ remove_producers_for_bridge_v2(
ClientId = maps:get(?kafka_client_id, AllocatedResources, no_client_id),
maps:foreach(
fun
- ({?kafka_producers, BridgeV2IdCheck}, Producers) when BridgeV2IdCheck =:= BridgeV2Id ->
+ ({?kafka_producers, BridgeV2IdCheck}, Producers) when
+ BridgeV2IdCheck =:= BridgeV2Id
+ ->
deallocate_producers(ClientId, Producers);
({?kafka_telemetry_id, BridgeV2IdCheck}, TelemetryId) when
BridgeV2IdCheck =:= BridgeV2Id
@@ -297,7 +310,8 @@ on_query(
#{installed_bridge_v2s := BridgeV2Configs} = _ConnectorState
) ->
#{
- message_template := Template,
+ message_template := MessageTemplate,
+ topic_template := TopicTemplate,
producers := Producers,
sync_query_timeout := SyncTimeout,
headers_tokens := KafkaHeadersTokens,
@@ -310,7 +324,8 @@ on_query(
headers_val_encode_mode => KafkaHeadersValEncodeMode
},
try
- KafkaMessage = render_message(Template, KafkaHeaders, Message),
+ KafkaTopic = render_topic(TopicTemplate, Message),
+ KafkaMessage = render_message(MessageTemplate, KafkaHeaders, Message),
?tp(
emqx_bridge_kafka_impl_producer_sync_query,
#{headers_config => KafkaHeaders, instance_id => InstId}
@@ -318,9 +333,15 @@ on_query(
emqx_trace:rendered_action_template(MessageTag, #{
message => KafkaMessage
}),
- do_send_msg(sync, KafkaMessage, Producers, SyncTimeout)
+ do_send_msg(sync, KafkaTopic, KafkaMessage, Producers, SyncTimeout)
catch
- error:{invalid_partition_count, Count, _Partitioner} ->
+ throw:bad_topic ->
+ ?tp("kafka_producer_failed_to_render_topic", #{}),
+ {error, {unrecoverable_error, failed_to_render_topic}};
+ throw:#{cause := unknown_topic_or_partition, topic := Topic} ->
+ ?tp("kafka_producer_resolved_to_unknown_topic", #{}),
+ {error, {unrecoverable_error, {resolved_to_unknown_topic, Topic}}};
+ throw:#{cause := invalid_partition_count, count := Count} ->
?tp("kafka_producer_invalid_partition_count", #{
action_id => MessageTag,
query_mode => sync
@@ -365,6 +386,7 @@ on_query_async(
) ->
#{
message_template := Template,
+ topic_template := TopicTemplate,
producers := Producers,
headers_tokens := KafkaHeadersTokens,
ext_headers_tokens := KafkaExtHeadersTokens,
@@ -376,6 +398,7 @@ on_query_async(
headers_val_encode_mode => KafkaHeadersValEncodeMode
},
try
+ KafkaTopic = render_topic(TopicTemplate, Message),
KafkaMessage = render_message(Template, KafkaHeaders, Message),
?tp(
emqx_bridge_kafka_impl_producer_async_query,
@@ -384,9 +407,15 @@ on_query_async(
emqx_trace:rendered_action_template(MessageTag, #{
message => KafkaMessage
}),
- do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn)
+ do_send_msg(async, KafkaTopic, KafkaMessage, Producers, AsyncReplyFn)
catch
- error:{invalid_partition_count, Count, _Partitioner} ->
+ throw:bad_topic ->
+ ?tp("kafka_producer_failed_to_render_topic", #{}),
+ {error, {unrecoverable_error, failed_to_render_topic}};
+ throw:#{cause := unknown_topic_or_partition, topic := Topic} ->
+ ?tp("kafka_producer_resolved_to_unknown_topic", #{}),
+ {error, {unrecoverable_error, {resolved_to_unknown_topic, Topic}}};
+ throw:#{cause := invalid_partition_count, count := Count} ->
?tp("kafka_producer_invalid_partition_count", #{
action_id => MessageTag,
query_mode => async
@@ -424,9 +453,28 @@ compile_message_template(T) ->
timestamp => preproc_tmpl(TimestampTemplate)
}.
+maybe_preproc_topic(Topic) ->
+ Template = emqx_template:parse(Topic),
+ case emqx_template:placeholders(Template) of
+ [] ->
+ {fixed, bin(Topic)};
+ [_ | _] ->
+ {dynamic, Template}
+ end.
+
preproc_tmpl(Tmpl) ->
emqx_placeholder:preproc_tmpl(Tmpl).
+render_topic({fixed, KafkaTopic}, _Message) ->
+ KafkaTopic;
+render_topic({dynamic, Template}, Message) ->
+ try
+ iolist_to_binary(emqx_template:render_strict(Template, Message))
+ catch
+ error:_Errors ->
+ throw(bad_topic)
+ end.
+
render_message(
#{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate},
#{
@@ -468,9 +516,11 @@ render_timestamp(Template, Message) ->
erlang:system_time(millisecond)
end.
-do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) ->
+do_send_msg(sync, KafkaTopic, KafkaMessage, Producers, SyncTimeout) ->
try
- {_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], SyncTimeout),
+ {_Partition, _Offset} = wolff:send_sync2(
+ Producers, KafkaTopic, [KafkaMessage], SyncTimeout
+ ),
ok
catch
error:{producer_down, _} = Reason ->
@@ -478,7 +528,7 @@ do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) ->
error:timeout ->
{error, timeout}
end;
-do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) ->
+do_send_msg(async, KafkaTopic, KafkaMessage, Producers, AsyncReplyFn) ->
%% * Must be a batch because wolff:send and wolff:send_sync are batch APIs
%% * Must be a single element batch because wolff books calls, but not batch sizes
%% for counters and gauges.
@@ -486,7 +536,9 @@ do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) ->
%% The retuned information is discarded here.
%% If the producer process is down when sending, this function would
%% raise an error exception which is to be caught by the caller of this callback
- {_Partition, Pid} = wolff:send(Producers, Batch, {fun ?MODULE:on_kafka_ack/3, [AsyncReplyFn]}),
+ {_Partition, Pid} = wolff:send2(
+ Producers, KafkaTopic, Batch, {fun ?MODULE:on_kafka_ack/3, [AsyncReplyFn]}
+ ),
%% this Pid is so far never used because Kafka producer is by-passing the buffer worker
{ok, Pid}.
@@ -527,20 +579,23 @@ on_get_status(
end.
on_get_channel_status(
- _ResId,
- ChannelId,
+ _ConnResId,
+ ActionResId,
#{
client_id := ClientId,
installed_bridge_v2s := Channels
- } = _State
+ } = _ConnState
) ->
%% Note: we must avoid returning `?status_disconnected' here. Returning
%% `?status_disconnected' will make resource manager try to restart the producers /
%% connector, thus potentially dropping data held in wolff producer's replayq. The
%% only exception is if the topic does not exist ("unhealthy target").
- #{kafka_topic := KafkaTopic, partitions_limit := MaxPartitions} = maps:get(ChannelId, Channels),
+ #{
+ topic := MKafkaTopic,
+ partitions_limit := MaxPartitions
+ } = maps:get(ActionResId, Channels),
try
- ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions),
+ ok = check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions),
?status_connected
catch
throw:{unhealthy_target, Msg} ->
@@ -549,22 +604,29 @@ on_get_channel_status(
{?status_connecting, {K, E}}
end.
-check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions) ->
+check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions) ->
case wolff_client_sup:find_client(ClientId) of
{ok, Pid} ->
- ok = check_topic_status(ClientId, Pid, KafkaTopic),
- ok = check_if_healthy_leaders(ClientId, Pid, KafkaTopic, MaxPartitions);
+ maybe
+ true ?= is_binary(MKafkaTopic),
+ ok = check_topic_status(ClientId, Pid, MKafkaTopic),
+ ok = check_if_healthy_leaders(
+ ActionResId, ClientId, Pid, MKafkaTopic, MaxPartitions
+ )
+ else
+ false -> ok
+ end;
{error, #{reason := no_such_client}} ->
throw(#{
reason => cannot_find_kafka_client,
kafka_client => ClientId,
- kafka_topic => KafkaTopic
+ kafka_topic => MKafkaTopic
});
{error, #{reason := client_supervisor_not_initialized}} ->
throw(#{
reason => restarting,
kafka_client => ClientId,
- kafka_topic => KafkaTopic
+ kafka_topic => MKafkaTopic
})
end.
@@ -591,8 +653,10 @@ error_summary(Map, [Error]) ->
error_summary(Map, [Error | More]) ->
Map#{first_error => Error, total_errors => length(More) + 1}.
-check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic, MaxPartitions) when is_pid(ClientPid) ->
- case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of
+check_if_healthy_leaders(ActionResId, ClientId, ClientPid, KafkaTopic, MaxPartitions) when
+ is_pid(ClientPid)
+->
+ case wolff_client:get_leader_connections(ClientPid, ActionResId, KafkaTopic, MaxPartitions) of
{ok, Leaders} ->
%% Kafka is considered healthy as long as any of the partition leader is reachable.
case lists:partition(fun({_Partition, Pid}) -> is_alive(Pid) end, Leaders) of
@@ -654,7 +718,7 @@ ssl(#{enable := true} = SSL) ->
ssl(_) ->
false.
-producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) ->
+producers_config(BridgeType, BridgeName, Input, IsDryRun, ActionResId) ->
#{
max_batch_bytes := MaxBatchBytes,
compression := Compression,
@@ -696,8 +760,8 @@ producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) ->
max_batch_bytes => MaxBatchBytes,
max_send_ahead => MaxInflight - 1,
compression => Compression,
- alias => BridgeV2Id,
- telemetry_meta_data => #{bridge_id => BridgeV2Id},
+ group => ActionResId,
+ telemetry_meta_data => #{bridge_id => ActionResId},
max_partitions => MaxPartitions
}.
@@ -773,20 +837,19 @@ handle_telemetry_event(_EventId, _Metrics, _MetaData, _HandlerConfig) ->
%% Note: don't use the instance/manager ID, as that changes everytime
%% the bridge is recreated, and will lead to multiplication of
%% metrics.
--spec telemetry_handler_id(resource_id()) -> binary().
-telemetry_handler_id(ResourceID) ->
- <<"emqx-bridge-kafka-producer-", ResourceID/binary>>.
+-spec telemetry_handler_id(action_resource_id()) -> binary().
+telemetry_handler_id(ActionResId) ->
+ ActionResId.
-uninstall_telemetry_handlers(ResourceID) ->
- HandlerID = telemetry_handler_id(ResourceID),
- telemetry:detach(HandlerID).
+uninstall_telemetry_handlers(TelemetryId) ->
+ telemetry:detach(TelemetryId).
-maybe_install_wolff_telemetry_handlers(ResourceID) ->
+maybe_install_wolff_telemetry_handlers(TelemetryId) ->
%% Attach event handlers for Kafka telemetry events. If a handler with the
%% handler id already exists, the attach_many function does nothing
telemetry:attach_many(
%% unique handler id
- telemetry_handler_id(ResourceID),
+ telemetry_handler_id(TelemetryId),
[
[wolff, dropped_queue_full],
[wolff, queuing],
@@ -798,7 +861,7 @@ maybe_install_wolff_telemetry_handlers(ResourceID) ->
%% wolff producers; otherwise, multiple kafka producer bridges
%% will install multiple handlers to the same wolff events,
%% multiplying the metric counts...
- #{bridge_id => ResourceID}
+ #{bridge_id => TelemetryId}
).
preproc_kafka_headers(HeadersTmpl) when HeadersTmpl =:= <<>>; HeadersTmpl =:= undefined ->
diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_producer_action_info.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_producer_action_info.erl
index d97e68ba6..b9e13e717 100644
--- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_producer_action_info.erl
+++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_producer_action_info.erl
@@ -26,7 +26,12 @@ schema_module() -> emqx_bridge_kafka.
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
BridgeV1Config1 = maps:remove(<<"connector">>, ActionConfig),
BridgeV1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config1),
- emqx_utils_maps:rename(<<"parameters">>, <<"kafka">>, BridgeV1Config2).
+ BridgeV1Config = emqx_utils_maps:rename(<<"parameters">>, <<"kafka">>, BridgeV1Config2),
+ maps:update_with(
+ <<"kafka">>,
+ fun(Params) -> maps:with(v1_parameters(), Params) end,
+ BridgeV1Config
+ ).
bridge_v1_config_to_action_config(BridgeV1Conf0 = #{<<"producer">> := _}, ConnectorName) ->
%% Ancient v1 config, when `kafka' key was wrapped by `producer'
@@ -51,6 +56,12 @@ bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
%% Internal helper functions
%%------------------------------------------------------------------------------------------
+v1_parameters() ->
+ [
+ to_bin(K)
+ || {K, _} <- emqx_bridge_kafka:fields(v1_producer_kafka_opts)
+ ].
+
producer_action_field_keys() ->
[
to_bin(K)
diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl
index 56aabb1c3..9119ee6c4 100644
--- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl
+++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl
@@ -477,7 +477,7 @@ do_start_producer(KafkaClientId, KafkaTopic) ->
ProducerConfig =
#{
name => Name,
- partitioner => roundrobin,
+ partitioner => random,
partition_count_refresh_interval_seconds => 1_000,
replayq_max_total_bytes => 10_000,
replayq_seg_bytes => 9_000,
@@ -1520,7 +1520,7 @@ t_receive_after_recovery(Config) ->
key => <<"commit", (integer_to_binary(N))/binary>>,
value => <<"commit", (integer_to_binary(N))/binary>>
}
- || N <- lists:seq(1, NPartitions)
+ || N <- lists:seq(1, NPartitions * 10)
],
%% we do distinct passes over this producing part so that
%% wolff won't batch everything together.
@@ -1933,7 +1933,7 @@ t_node_joins_existing_cluster(Config) ->
Val = <<"v", (integer_to_binary(N))/binary>>,
publish(Config, KafkaTopic, [#{key => Key, value => Val}])
end,
- lists:seq(1, NPartitions)
+ lists:seq(1, 10 * NPartitions)
),
{ok, _} = snabbkaffe:receive_events(SRef1),
diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl
index c26f5e94e..08b2723e7 100644
--- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl
+++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl
@@ -23,6 +23,7 @@
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("brod/include/brod.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
+-include_lib("emqx/include/asserts.hrl").
-import(emqx_common_test_helpers, [on_exit/1]).
@@ -165,6 +166,9 @@ send_message(Type, ActionName) ->
resolve_kafka_offset() ->
KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
+ resolve_kafka_offset(KafkaTopic).
+
+resolve_kafka_offset(KafkaTopic) ->
Partition = 0,
Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
{ok, Offset0} = emqx_bridge_kafka_impl_producer_SUITE:resolve_kafka_offset(
@@ -174,11 +178,32 @@ resolve_kafka_offset() ->
check_kafka_message_payload(Offset, ExpectedPayload) ->
KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
+ check_kafka_message_payload(KafkaTopic, Offset, ExpectedPayload).
+
+check_kafka_message_payload(KafkaTopic, Offset, ExpectedPayload) ->
Partition = 0,
Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
{ok, {_, [KafkaMsg0]}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset),
?assertMatch(#kafka_message{value = ExpectedPayload}, KafkaMsg0).
+ensure_kafka_topic(KafkaTopic) ->
+ TopicConfigs = [
+ #{
+ name => KafkaTopic,
+ num_partitions => 1,
+ replication_factor => 1,
+ assignments => [],
+ configs => []
+ }
+ ],
+ RequestConfig = #{timeout => 5_000},
+ ConnConfig = #{},
+ Endpoints = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
+ case brod:create_topics(Endpoints, TopicConfigs, RequestConfig, ConnConfig) of
+ ok -> ok;
+ {error, topic_already_exists} -> ok
+ end.
+
action_config(ConnectorName) ->
action_config(ConnectorName, _Overrides = #{}).
@@ -728,9 +753,13 @@ t_invalid_partition_count_metrics(Config) ->
%% Simulate `invalid_partition_count'
emqx_common_test_helpers:with_mock(
wolff,
- send_sync,
- fun(_Producers, _Msgs, _Timeout) ->
- error({invalid_partition_count, 0, partitioner})
+ send_sync2,
+ fun(_Producers, _Topic, _Msgs, _Timeout) ->
+ throw(#{
+ cause => invalid_partition_count,
+ count => 0,
+ partitioner => partitioner
+ })
end,
fun() ->
{{ok, _}, {ok, _}} =
@@ -773,9 +802,13 @@ t_invalid_partition_count_metrics(Config) ->
%% Simulate `invalid_partition_count'
emqx_common_test_helpers:with_mock(
wolff,
- send,
- fun(_Producers, _Msgs, _Timeout) ->
- error({invalid_partition_count, 0, partitioner})
+ send2,
+ fun(_Producers, _Topic, _Msgs, _AckCallback) ->
+ throw(#{
+ cause => invalid_partition_count,
+ count => 0,
+ partitioner => partitioner
+ })
end,
fun() ->
{{ok, _}, {ok, _}} =
@@ -881,3 +914,131 @@ t_multiple_actions_sharing_topic(Config) ->
end
),
ok.
+
+%% Smoke tests for using a templated topic and adynamic kafka topics.
+t_dynamic_topics(Config) ->
+ Type = proplists:get_value(type, Config, ?TYPE),
+ ConnectorName = proplists:get_value(connector_name, Config, <<"c">>),
+ ConnectorConfig = proplists:get_value(connector_config, Config, connector_config()),
+ ActionName = <<"dynamic_topics">>,
+ ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)),
+ PreConfiguredTopic1 = <<"pct1">>,
+ PreConfiguredTopic2 = <<"pct2">>,
+ ensure_kafka_topic(PreConfiguredTopic1),
+ ensure_kafka_topic(PreConfiguredTopic2),
+ ActionConfig = emqx_bridge_v2_testlib:parse_and_check(
+ action,
+ Type,
+ ActionName,
+ emqx_utils_maps:deep_merge(
+ ActionConfig1,
+ #{
+ <<"parameters">> => #{
+ <<"topic">> => <<"pct${.payload.n}">>,
+ <<"message">> => #{
+ <<"key">> => <<"${.clientid}">>,
+ <<"value">> => <<"${.payload.p}">>
+ }
+ }
+ }
+ )
+ ),
+ ?check_trace(
+ #{timetrap => 7_000},
+ begin
+ ConnectorParams = [
+ {connector_config, ConnectorConfig},
+ {connector_name, ConnectorName},
+ {connector_type, Type}
+ ],
+ ActionParams = [
+ {action_config, ActionConfig},
+ {action_name, ActionName},
+ {action_type, Type}
+ ],
+ {ok, {{_, 201, _}, _, #{}}} =
+ emqx_bridge_v2_testlib:create_connector_api(ConnectorParams),
+
+ {ok, {{_, 201, _}, _, #{}}} =
+ emqx_bridge_v2_testlib:create_action_api(ActionParams),
+ RuleTopic = <<"pct">>,
+ {ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http(
+ Type,
+ RuleTopic,
+ [
+ {bridge_name, ActionName}
+ ],
+ #{
+ sql =>
+ <<"select *, json_decode(payload) as payload from \"", RuleTopic/binary,
+ "\" ">>
+ }
+ ),
+ ?assertStatusAPI(Type, ActionName, <<"connected">>),
+
+ HandlerId = ?FUNCTION_NAME,
+ TestPid = self(),
+ telemetry:attach_many(
+ HandlerId,
+ emqx_resource_metrics:events(),
+ fun(EventName, Measurements, Metadata, _Config) ->
+ Data = #{
+ name => EventName,
+ measurements => Measurements,
+ metadata => Metadata
+ },
+ TestPid ! {telemetry, Data},
+ ok
+ end,
+ unused_config
+ ),
+ on_exit(fun() -> telemetry:detach(HandlerId) end),
+
+ {ok, C} = emqtt:start_link(#{}),
+ {ok, _} = emqtt:connect(C),
+ Payload = fun(Map) -> emqx_utils_json:encode(Map) end,
+ Offset1 = resolve_kafka_offset(PreConfiguredTopic1),
+ Offset2 = resolve_kafka_offset(PreConfiguredTopic2),
+ {ok, _} = emqtt:publish(C, RuleTopic, Payload(#{n => 1, p => <<"p1">>}), [{qos, 1}]),
+ {ok, _} = emqtt:publish(C, RuleTopic, Payload(#{n => 2, p => <<"p2">>}), [{qos, 1}]),
+
+ check_kafka_message_payload(PreConfiguredTopic1, Offset1, <<"p1">>),
+ check_kafka_message_payload(PreConfiguredTopic2, Offset2, <<"p2">>),
+
+ ActionId = emqx_bridge_v2:id(Type, ActionName),
+ ?assertEqual(2, emqx_resource_metrics:matched_get(ActionId)),
+ ?assertEqual(2, emqx_resource_metrics:success_get(ActionId)),
+ ?assertEqual(0, emqx_resource_metrics:queuing_get(ActionId)),
+
+ ?assertReceive(
+ {telemetry, #{
+ measurements := #{gauge_set := _},
+ metadata := #{worker_id := _, resource_id := ActionId}
+ }}
+ ),
+
+ %% If there isn't enough information in the context to resolve to a topic, it
+ %% should be an unrecoverable error.
+ ?assertMatch(
+ {_, {ok, _}},
+ ?wait_async_action(
+ emqtt:publish(C, RuleTopic, Payload(#{not_enough => <<"info">>}), [{qos, 1}]),
+ #{?snk_kind := "kafka_producer_failed_to_render_topic"}
+ )
+ ),
+
+ %% If it's possible to render the topic, but it isn't in the pre-configured
+ %% list, it should be an unrecoverable error.
+ ?assertMatch(
+ {_, {ok, _}},
+ ?wait_async_action(
+ emqtt:publish(C, RuleTopic, Payload(#{n => 99}), [{qos, 1}]),
+ #{?snk_kind := "kafka_producer_resolved_to_unknown_topic"}
+ )
+ ),
+
+ ok
+ end,
+ []
+ ),
+ ok.
diff --git a/changes/ee/feat-13452.en.md b/changes/ee/feat-13452.en.md
new file mode 100644
index 000000000..7b2427329
--- /dev/null
+++ b/changes/ee/feat-13452.en.md
@@ -0,0 +1,5 @@
+Kafka producer action's `topic` config now supports templates.
+
+The topics must be already created in Kafka. If a message is rendered towards a non-existing topic in Kafka (given Kafka disabled topic auto-creation), the message will fail with an unrecoverable error. Also, if a message does not contain enough information to render to the configured template (e.g.: the template is `t-${t}` and the message context does not define `t`), this message will also fail with an unrecoverable error.
+
+This same feature is also available for Azure Event Hubs and Confluent Platform producer integrations.
diff --git a/mix.exs b/mix.exs
index 53e5b304f..399c996a6 100644
--- a/mix.exs
+++ b/mix.exs
@@ -361,7 +361,7 @@ defmodule EMQXUmbrella.MixProject do
{:hstreamdb_erl,
github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1+ezstd-v1.0.5-emqx1"},
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true},
- {:wolff, github: "kafka4beam/wolff", tag: "2.0.0"},
+ {:wolff, github: "kafka4beam/wolff", tag: "3.0.2"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
{:brod, github: "kafka4beam/brod", tag: "3.18.0"},
diff --git a/rel/i18n/emqx_bridge_azure_event_hub.hocon b/rel/i18n/emqx_bridge_azure_event_hub.hocon
index 3b96e23e6..7e37d2e4c 100644
--- a/rel/i18n/emqx_bridge_azure_event_hub.hocon
+++ b/rel/i18n/emqx_bridge_azure_event_hub.hocon
@@ -69,7 +69,7 @@ producer_kafka_opts.label:
"""Azure Event Hubs Producer"""
kafka_topic.desc:
-"""Event Hubs name"""
+"""Event Hubs name. Supports templates (e.g.: `t-${payload.t}`)."""
kafka_topic.label:
"""Event Hubs Name"""
diff --git a/rel/i18n/emqx_bridge_confluent_producer.hocon b/rel/i18n/emqx_bridge_confluent_producer.hocon
index 748373691..38623502e 100644
--- a/rel/i18n/emqx_bridge_confluent_producer.hocon
+++ b/rel/i18n/emqx_bridge_confluent_producer.hocon
@@ -69,10 +69,10 @@ producer_kafka_opts.label:
"""Confluent Producer"""
kafka_topic.desc:
-"""Event Hub name"""
+"""Kafka topic name. Supports templates (e.g.: `t-${payload.t}`)."""
kafka_topic.label:
-"""Event Hub Name"""
+"""Kafka Topic Name"""
kafka_message_timestamp.desc:
"""Which timestamp to use. The timestamp is expected to be a millisecond precision Unix epoch which can be in string format, e.g. 1661326462115
or '1661326462115'
. When the desired data field for this template is not found, or if the found data is not a valid integer, the current system timestamp will be used."""
diff --git a/rel/i18n/emqx_bridge_kafka.hocon b/rel/i18n/emqx_bridge_kafka.hocon
index 6e0074ddd..a066d30fc 100644
--- a/rel/i18n/emqx_bridge_kafka.hocon
+++ b/rel/i18n/emqx_bridge_kafka.hocon
@@ -81,7 +81,7 @@ producer_kafka_opts.label:
"""Kafka Producer"""
kafka_topic.desc:
-"""Kafka topic name"""
+"""Kafka topic name. Supports templates (e.g.: `t-${payload.t}`)."""
kafka_topic.label:
"""Kafka Topic Name"""
@@ -446,5 +446,4 @@ server_name_indication.desc:
server_name_indication.label:
"""SNI"""
-
}