feat(kafka producer): allow dynamic topics from pre-configured topics
Fixes https://emqx.atlassian.net/browse/EMQX-12656
This commit is contained in:
parent
33eccb35da
commit
df1f4fad70
|
@ -391,3 +391,37 @@ t_multiple_actions_sharing_topic(Config) ->
|
||||||
]
|
]
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_pre_configured_topics(Config) ->
|
||||||
|
ActionConfig0 = ?config(action_config, Config),
|
||||||
|
ActionConfig =
|
||||||
|
emqx_utils_maps:deep_merge(
|
||||||
|
ActionConfig0,
|
||||||
|
#{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
|
||||||
|
),
|
||||||
|
ok = emqx_bridge_v2_kafka_producer_SUITE:t_pre_configured_topics(
|
||||||
|
[
|
||||||
|
{type, ?BRIDGE_TYPE_BIN},
|
||||||
|
{connector_name, ?config(connector_name, Config)},
|
||||||
|
{connector_config, ?config(connector_config, Config)},
|
||||||
|
{action_config, ActionConfig}
|
||||||
|
]
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_templated_topic_and_no_pre_configured_topics(Config) ->
|
||||||
|
ActionConfig0 = ?config(action_config, Config),
|
||||||
|
ActionConfig =
|
||||||
|
emqx_utils_maps:deep_merge(
|
||||||
|
ActionConfig0,
|
||||||
|
#{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
|
||||||
|
),
|
||||||
|
ok = emqx_bridge_v2_kafka_producer_SUITE:t_templated_topic_and_no_pre_configured_topics(
|
||||||
|
[
|
||||||
|
{type, ?BRIDGE_TYPE_BIN},
|
||||||
|
{connector_name, ?config(connector_name, Config)},
|
||||||
|
{connector_config, ?config(connector_config, Config)},
|
||||||
|
{action_config, ActionConfig}
|
||||||
|
]
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
|
@ -400,3 +400,37 @@ t_multiple_actions_sharing_topic(Config) ->
|
||||||
]
|
]
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_pre_configured_topics(Config) ->
|
||||||
|
ActionConfig0 = ?config(action_config, Config),
|
||||||
|
ActionConfig =
|
||||||
|
emqx_utils_maps:deep_merge(
|
||||||
|
ActionConfig0,
|
||||||
|
#{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
|
||||||
|
),
|
||||||
|
ok = emqx_bridge_v2_kafka_producer_SUITE:t_pre_configured_topics(
|
||||||
|
[
|
||||||
|
{type, ?ACTION_TYPE_BIN},
|
||||||
|
{connector_name, ?config(connector_name, Config)},
|
||||||
|
{connector_config, ?config(connector_config, Config)},
|
||||||
|
{action_config, ActionConfig}
|
||||||
|
]
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_templated_topic_and_no_pre_configured_topics(Config) ->
|
||||||
|
ActionConfig0 = ?config(action_config, Config),
|
||||||
|
ActionConfig =
|
||||||
|
emqx_utils_maps:deep_merge(
|
||||||
|
ActionConfig0,
|
||||||
|
#{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
|
||||||
|
),
|
||||||
|
ok = emqx_bridge_v2_kafka_producer_SUITE:t_templated_topic_and_no_pre_configured_topics(
|
||||||
|
[
|
||||||
|
{type, ?ACTION_TYPE_BIN},
|
||||||
|
{connector_name, ?config(connector_name, Config)},
|
||||||
|
{connector_config, ?config(connector_config, Config)},
|
||||||
|
{action_config, ActionConfig}
|
||||||
|
]
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
|
@ -295,6 +295,7 @@ fields("config_producer") ->
|
||||||
fields("config_consumer") ->
|
fields("config_consumer") ->
|
||||||
fields(kafka_consumer);
|
fields(kafka_consumer);
|
||||||
fields(kafka_producer) ->
|
fields(kafka_producer) ->
|
||||||
|
%% Schema used by bridges V1.
|
||||||
connector_config_fields() ++ producer_opts(v1);
|
connector_config_fields() ++ producer_opts(v1);
|
||||||
fields(kafka_producer_action) ->
|
fields(kafka_producer_action) ->
|
||||||
[
|
[
|
||||||
|
@ -306,6 +307,10 @@ fields(kafka_producer_action) ->
|
||||||
{tags, emqx_schema:tags_schema()},
|
{tags, emqx_schema:tags_schema()},
|
||||||
{description, emqx_schema:description_schema()}
|
{description, emqx_schema:description_schema()}
|
||||||
] ++ producer_opts(action);
|
] ++ producer_opts(action);
|
||||||
|
fields(pre_configured_topic) ->
|
||||||
|
[
|
||||||
|
{topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})}
|
||||||
|
];
|
||||||
fields(kafka_consumer) ->
|
fields(kafka_consumer) ->
|
||||||
connector_config_fields() ++ fields(consumer_opts);
|
connector_config_fields() ++ fields(consumer_opts);
|
||||||
fields(ssl_client_opts) ->
|
fields(ssl_client_opts) ->
|
||||||
|
@ -364,9 +369,41 @@ fields(socket_opts) ->
|
||||||
validator => fun emqx_schema:validate_tcp_keepalive/1
|
validator => fun emqx_schema:validate_tcp_keepalive/1
|
||||||
})}
|
})}
|
||||||
];
|
];
|
||||||
|
fields(v1_producer_kafka_opts) ->
|
||||||
|
OldSchemaFields =
|
||||||
|
[
|
||||||
|
topic,
|
||||||
|
message,
|
||||||
|
max_batch_bytes,
|
||||||
|
compression,
|
||||||
|
partition_strategy,
|
||||||
|
required_acks,
|
||||||
|
kafka_headers,
|
||||||
|
kafka_ext_headers,
|
||||||
|
kafka_header_value_encode_mode,
|
||||||
|
partition_count_refresh_interval,
|
||||||
|
partitions_limit,
|
||||||
|
max_inflight,
|
||||||
|
buffer,
|
||||||
|
query_mode,
|
||||||
|
sync_query_timeout
|
||||||
|
],
|
||||||
|
Fields = fields(producer_kafka_opts),
|
||||||
|
lists:filter(
|
||||||
|
fun({K, _V}) -> lists:member(K, OldSchemaFields) end,
|
||||||
|
Fields
|
||||||
|
);
|
||||||
fields(producer_kafka_opts) ->
|
fields(producer_kafka_opts) ->
|
||||||
[
|
[
|
||||||
{topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})},
|
{topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})},
|
||||||
|
{pre_configured_topics,
|
||||||
|
mk(
|
||||||
|
hoconsc:array(ref(pre_configured_topic)),
|
||||||
|
#{
|
||||||
|
default => [],
|
||||||
|
desc => ?DESC("producer_pre_configured_topics")
|
||||||
|
}
|
||||||
|
)},
|
||||||
{message, mk(ref(kafka_message), #{required => false, desc => ?DESC(kafka_message)})},
|
{message, mk(ref(kafka_message), #{required => false, desc => ?DESC(kafka_message)})},
|
||||||
{max_batch_bytes,
|
{max_batch_bytes,
|
||||||
mk(emqx_schema:bytesize(), #{default => <<"896KB">>, desc => ?DESC(max_batch_bytes)})},
|
mk(emqx_schema:bytesize(), #{default => <<"896KB">>, desc => ?DESC(max_batch_bytes)})},
|
||||||
|
@ -675,15 +712,15 @@ resource_opts() ->
|
||||||
%% However we need to keep it backward compatible for generated schema json (version 0.1.0)
|
%% However we need to keep it backward compatible for generated schema json (version 0.1.0)
|
||||||
%% since schema is data for the 'schemas' API.
|
%% since schema is data for the 'schemas' API.
|
||||||
parameters_field(ActionOrBridgeV1) ->
|
parameters_field(ActionOrBridgeV1) ->
|
||||||
{Name, Alias} =
|
{Name, Alias, Ref} =
|
||||||
case ActionOrBridgeV1 of
|
case ActionOrBridgeV1 of
|
||||||
v1 ->
|
v1 ->
|
||||||
{kafka, parameters};
|
{kafka, parameters, v1_producer_kafka_opts};
|
||||||
action ->
|
action ->
|
||||||
{parameters, kafka}
|
{parameters, kafka, producer_kafka_opts}
|
||||||
end,
|
end,
|
||||||
{Name,
|
{Name,
|
||||||
mk(ref(producer_kafka_opts), #{
|
mk(ref(Ref), #{
|
||||||
required => true,
|
required => true,
|
||||||
aliases => [Alias],
|
aliases => [Alias],
|
||||||
desc => ?DESC(producer_kafka_opts),
|
desc => ?DESC(producer_kafka_opts),
|
||||||
|
|
|
@ -122,8 +122,8 @@ on_add_channel(
|
||||||
{ok, NewState}.
|
{ok, NewState}.
|
||||||
|
|
||||||
create_producers_for_bridge_v2(
|
create_producers_for_bridge_v2(
|
||||||
InstId,
|
ConnResId,
|
||||||
BridgeV2Id,
|
ActionResId,
|
||||||
ClientId,
|
ClientId,
|
||||||
#{
|
#{
|
||||||
bridge_type := BridgeType,
|
bridge_type := BridgeType,
|
||||||
|
@ -132,33 +132,57 @@ create_producers_for_bridge_v2(
|
||||||
) ->
|
) ->
|
||||||
#{
|
#{
|
||||||
message := MessageTemplate,
|
message := MessageTemplate,
|
||||||
topic := KafkaTopic,
|
pre_configured_topics := PreConfiguredTopics0,
|
||||||
|
topic := KafkaTopic0,
|
||||||
sync_query_timeout := SyncQueryTimeout
|
sync_query_timeout := SyncQueryTimeout
|
||||||
} = KafkaConfig,
|
} = KafkaConfig,
|
||||||
|
TopicTemplate = {TopicType, KafkaTopic} = maybe_preproc_topic(KafkaTopic0),
|
||||||
|
PreConfiguredTopics = [T || #{topic := T} <- PreConfiguredTopics0],
|
||||||
|
KafkaTopics0 =
|
||||||
|
case TopicType of
|
||||||
|
fixed ->
|
||||||
|
[KafkaTopic | PreConfiguredTopics];
|
||||||
|
dynamic ->
|
||||||
|
PreConfiguredTopics
|
||||||
|
end,
|
||||||
|
case KafkaTopics0 of
|
||||||
|
[] ->
|
||||||
|
throw(<<
|
||||||
|
"Either the Kafka topic must be fixed (not a template),"
|
||||||
|
" or at least one pre-defined topic must be set."
|
||||||
|
>>);
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
KafkaTopics = lists:map(fun bin/1, KafkaTopics0),
|
||||||
KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)),
|
KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)),
|
||||||
KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])),
|
KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])),
|
||||||
KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none),
|
KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none),
|
||||||
MaxPartitions = maps:get(partitions_limit, KafkaConfig, all_partitions),
|
MaxPartitions = maps:get(partitions_limit, KafkaConfig, all_partitions),
|
||||||
#{name := BridgeName} = emqx_bridge_v2:parse_id(BridgeV2Id),
|
#{name := BridgeName} = emqx_bridge_v2:parse_id(ActionResId),
|
||||||
IsDryRun = emqx_resource:is_dry_run(BridgeV2Id),
|
IsDryRun = emqx_resource:is_dry_run(ActionResId),
|
||||||
ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions),
|
[AKafkaTopic | _] = KafkaTopics,
|
||||||
|
ok = check_topic_and_leader_connections(ActionResId, ClientId, AKafkaTopic, MaxPartitions),
|
||||||
WolffProducerConfig = producers_config(
|
WolffProducerConfig = producers_config(
|
||||||
BridgeType, BridgeName, KafkaConfig, IsDryRun, BridgeV2Id
|
BridgeType, BridgeName, KafkaConfig, IsDryRun, ActionResId
|
||||||
),
|
),
|
||||||
case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of
|
case wolff:ensure_supervised_dynamic_producers(ClientId, WolffProducerConfig) of
|
||||||
{ok, Producers} ->
|
{ok, Producers} ->
|
||||||
ok = emqx_resource:allocate_resource(InstId, {?kafka_producers, BridgeV2Id}, Producers),
|
|
||||||
ok = emqx_resource:allocate_resource(
|
ok = emqx_resource:allocate_resource(
|
||||||
InstId, {?kafka_telemetry_id, BridgeV2Id}, BridgeV2Id
|
ConnResId, {?kafka_producers, ActionResId}, Producers
|
||||||
),
|
),
|
||||||
_ = maybe_install_wolff_telemetry_handlers(BridgeV2Id),
|
ok = emqx_resource:allocate_resource(
|
||||||
|
ConnResId, {?kafka_telemetry_id, ActionResId}, ActionResId
|
||||||
|
),
|
||||||
|
_ = maybe_install_wolff_telemetry_handlers(ActionResId),
|
||||||
{ok, #{
|
{ok, #{
|
||||||
message_template => compile_message_template(MessageTemplate),
|
message_template => compile_message_template(MessageTemplate),
|
||||||
kafka_client_id => ClientId,
|
kafka_client_id => ClientId,
|
||||||
kafka_topic => KafkaTopic,
|
topic_template => TopicTemplate,
|
||||||
|
pre_configured_topics => KafkaTopics,
|
||||||
producers => Producers,
|
producers => Producers,
|
||||||
resource_id => BridgeV2Id,
|
resource_id => ActionResId,
|
||||||
connector_resource_id => InstId,
|
connector_resource_id => ConnResId,
|
||||||
sync_query_timeout => SyncQueryTimeout,
|
sync_query_timeout => SyncQueryTimeout,
|
||||||
kafka_config => KafkaConfig,
|
kafka_config => KafkaConfig,
|
||||||
headers_tokens => KafkaHeadersTokens,
|
headers_tokens => KafkaHeadersTokens,
|
||||||
|
@ -169,7 +193,7 @@ create_producers_for_bridge_v2(
|
||||||
{error, Reason2} ->
|
{error, Reason2} ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{
|
||||||
msg => "failed_to_start_kafka_producer",
|
msg => "failed_to_start_kafka_producer",
|
||||||
instance_id => InstId,
|
instance_id => ConnResId,
|
||||||
kafka_client_id => ClientId,
|
kafka_client_id => ClientId,
|
||||||
kafka_topic => KafkaTopic,
|
kafka_topic => KafkaTopic,
|
||||||
reason => Reason2
|
reason => Reason2
|
||||||
|
@ -264,7 +288,9 @@ remove_producers_for_bridge_v2(
|
||||||
ClientId = maps:get(?kafka_client_id, AllocatedResources, no_client_id),
|
ClientId = maps:get(?kafka_client_id, AllocatedResources, no_client_id),
|
||||||
maps:foreach(
|
maps:foreach(
|
||||||
fun
|
fun
|
||||||
({?kafka_producers, BridgeV2IdCheck}, Producers) when BridgeV2IdCheck =:= BridgeV2Id ->
|
({?kafka_producers, BridgeV2IdCheck}, Producers) when
|
||||||
|
BridgeV2IdCheck =:= BridgeV2Id
|
||||||
|
->
|
||||||
deallocate_producers(ClientId, Producers);
|
deallocate_producers(ClientId, Producers);
|
||||||
({?kafka_telemetry_id, BridgeV2IdCheck}, TelemetryId) when
|
({?kafka_telemetry_id, BridgeV2IdCheck}, TelemetryId) when
|
||||||
BridgeV2IdCheck =:= BridgeV2Id
|
BridgeV2IdCheck =:= BridgeV2Id
|
||||||
|
@ -297,8 +323,10 @@ on_query(
|
||||||
#{installed_bridge_v2s := BridgeV2Configs} = _ConnectorState
|
#{installed_bridge_v2s := BridgeV2Configs} = _ConnectorState
|
||||||
) ->
|
) ->
|
||||||
#{
|
#{
|
||||||
message_template := Template,
|
message_template := MessageTemplate,
|
||||||
|
topic_template := TopicTemplate,
|
||||||
producers := Producers,
|
producers := Producers,
|
||||||
|
pre_configured_topics := PreConfiguredTopics,
|
||||||
sync_query_timeout := SyncTimeout,
|
sync_query_timeout := SyncTimeout,
|
||||||
headers_tokens := KafkaHeadersTokens,
|
headers_tokens := KafkaHeadersTokens,
|
||||||
ext_headers_tokens := KafkaExtHeadersTokens,
|
ext_headers_tokens := KafkaExtHeadersTokens,
|
||||||
|
@ -310,7 +338,14 @@ on_query(
|
||||||
headers_val_encode_mode => KafkaHeadersValEncodeMode
|
headers_val_encode_mode => KafkaHeadersValEncodeMode
|
||||||
},
|
},
|
||||||
try
|
try
|
||||||
KafkaMessage = render_message(Template, KafkaHeaders, Message),
|
KafkaTopic = render_topic(TopicTemplate, Message),
|
||||||
|
case lists:member(KafkaTopic, PreConfiguredTopics) of
|
||||||
|
false ->
|
||||||
|
throw({unknown_topic, KafkaTopic});
|
||||||
|
true ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
KafkaMessage = render_message(MessageTemplate, KafkaHeaders, Message),
|
||||||
?tp(
|
?tp(
|
||||||
emqx_bridge_kafka_impl_producer_sync_query,
|
emqx_bridge_kafka_impl_producer_sync_query,
|
||||||
#{headers_config => KafkaHeaders, instance_id => InstId}
|
#{headers_config => KafkaHeaders, instance_id => InstId}
|
||||||
|
@ -318,9 +353,15 @@ on_query(
|
||||||
emqx_trace:rendered_action_template(MessageTag, #{
|
emqx_trace:rendered_action_template(MessageTag, #{
|
||||||
message => KafkaMessage
|
message => KafkaMessage
|
||||||
}),
|
}),
|
||||||
do_send_msg(sync, KafkaMessage, Producers, SyncTimeout)
|
do_send_msg(sync, KafkaTopic, KafkaMessage, Producers, SyncTimeout)
|
||||||
catch
|
catch
|
||||||
error:{invalid_partition_count, Count, _Partitioner} ->
|
throw:bad_topic ->
|
||||||
|
?tp("kafka_producer_failed_to_render_topic", #{}),
|
||||||
|
{error, {unrecoverable_error, failed_to_render_topic}};
|
||||||
|
throw:{unknown_topic, Topic} ->
|
||||||
|
?tp("kafka_producer_resolved_to_unknown_topic", #{}),
|
||||||
|
{error, {unrecoverable_error, {resolved_to_unknown_topic, Topic}}};
|
||||||
|
throw:#{cause := invalid_partition_count, count := Count} ->
|
||||||
?tp("kafka_producer_invalid_partition_count", #{
|
?tp("kafka_producer_invalid_partition_count", #{
|
||||||
action_id => MessageTag,
|
action_id => MessageTag,
|
||||||
query_mode => sync
|
query_mode => sync
|
||||||
|
@ -365,7 +406,9 @@ on_query_async(
|
||||||
) ->
|
) ->
|
||||||
#{
|
#{
|
||||||
message_template := Template,
|
message_template := Template,
|
||||||
|
topic_template := TopicTemplate,
|
||||||
producers := Producers,
|
producers := Producers,
|
||||||
|
pre_configured_topics := PreConfiguredTopics,
|
||||||
headers_tokens := KafkaHeadersTokens,
|
headers_tokens := KafkaHeadersTokens,
|
||||||
ext_headers_tokens := KafkaExtHeadersTokens,
|
ext_headers_tokens := KafkaExtHeadersTokens,
|
||||||
headers_val_encode_mode := KafkaHeadersValEncodeMode
|
headers_val_encode_mode := KafkaHeadersValEncodeMode
|
||||||
|
@ -376,6 +419,13 @@ on_query_async(
|
||||||
headers_val_encode_mode => KafkaHeadersValEncodeMode
|
headers_val_encode_mode => KafkaHeadersValEncodeMode
|
||||||
},
|
},
|
||||||
try
|
try
|
||||||
|
KafkaTopic = render_topic(TopicTemplate, Message),
|
||||||
|
case lists:member(KafkaTopic, PreConfiguredTopics) of
|
||||||
|
false ->
|
||||||
|
throw({unknown_topic, KafkaTopic});
|
||||||
|
true ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
KafkaMessage = render_message(Template, KafkaHeaders, Message),
|
KafkaMessage = render_message(Template, KafkaHeaders, Message),
|
||||||
?tp(
|
?tp(
|
||||||
emqx_bridge_kafka_impl_producer_async_query,
|
emqx_bridge_kafka_impl_producer_async_query,
|
||||||
|
@ -384,9 +434,15 @@ on_query_async(
|
||||||
emqx_trace:rendered_action_template(MessageTag, #{
|
emqx_trace:rendered_action_template(MessageTag, #{
|
||||||
message => KafkaMessage
|
message => KafkaMessage
|
||||||
}),
|
}),
|
||||||
do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn)
|
do_send_msg(async, KafkaTopic, KafkaMessage, Producers, AsyncReplyFn)
|
||||||
catch
|
catch
|
||||||
error:{invalid_partition_count, Count, _Partitioner} ->
|
throw:bad_topic ->
|
||||||
|
?tp("kafka_producer_failed_to_render_topic", #{}),
|
||||||
|
{error, {unrecoverable_error, failed_to_render_topic}};
|
||||||
|
throw:{unknown_topic, Topic} ->
|
||||||
|
?tp("kafka_producer_resolved_to_unknown_topic", #{}),
|
||||||
|
{error, {unrecoverable_error, {resolved_to_unknown_topic, Topic}}};
|
||||||
|
throw:#{cause := invalid_partition_count, count := Count} ->
|
||||||
?tp("kafka_producer_invalid_partition_count", #{
|
?tp("kafka_producer_invalid_partition_count", #{
|
||||||
action_id => MessageTag,
|
action_id => MessageTag,
|
||||||
query_mode => async
|
query_mode => async
|
||||||
|
@ -424,9 +480,28 @@ compile_message_template(T) ->
|
||||||
timestamp => preproc_tmpl(TimestampTemplate)
|
timestamp => preproc_tmpl(TimestampTemplate)
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
maybe_preproc_topic(Topic) ->
|
||||||
|
Template = emqx_template:parse(Topic),
|
||||||
|
case emqx_template:placeholders(Template) of
|
||||||
|
[] ->
|
||||||
|
{fixed, bin(Topic)};
|
||||||
|
[_ | _] ->
|
||||||
|
{dynamic, Template}
|
||||||
|
end.
|
||||||
|
|
||||||
preproc_tmpl(Tmpl) ->
|
preproc_tmpl(Tmpl) ->
|
||||||
emqx_placeholder:preproc_tmpl(Tmpl).
|
emqx_placeholder:preproc_tmpl(Tmpl).
|
||||||
|
|
||||||
|
render_topic({fixed, KafkaTopic}, _Message) ->
|
||||||
|
KafkaTopic;
|
||||||
|
render_topic({dynamic, Template}, Message) ->
|
||||||
|
try
|
||||||
|
iolist_to_binary(emqx_template:render_strict(Template, Message))
|
||||||
|
catch
|
||||||
|
error:_Errors ->
|
||||||
|
throw(bad_topic)
|
||||||
|
end.
|
||||||
|
|
||||||
render_message(
|
render_message(
|
||||||
#{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate},
|
#{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate},
|
||||||
#{
|
#{
|
||||||
|
@ -468,9 +543,11 @@ render_timestamp(Template, Message) ->
|
||||||
erlang:system_time(millisecond)
|
erlang:system_time(millisecond)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) ->
|
do_send_msg(sync, KafkaTopic, KafkaMessage, Producers, SyncTimeout) ->
|
||||||
try
|
try
|
||||||
{_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], SyncTimeout),
|
{_Partition, _Offset} = wolff:send_sync2(
|
||||||
|
Producers, KafkaTopic, [KafkaMessage], SyncTimeout
|
||||||
|
),
|
||||||
ok
|
ok
|
||||||
catch
|
catch
|
||||||
error:{producer_down, _} = Reason ->
|
error:{producer_down, _} = Reason ->
|
||||||
|
@ -478,7 +555,7 @@ do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) ->
|
||||||
error:timeout ->
|
error:timeout ->
|
||||||
{error, timeout}
|
{error, timeout}
|
||||||
end;
|
end;
|
||||||
do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) ->
|
do_send_msg(async, KafkaTopic, KafkaMessage, Producers, AsyncReplyFn) ->
|
||||||
%% * Must be a batch because wolff:send and wolff:send_sync are batch APIs
|
%% * Must be a batch because wolff:send and wolff:send_sync are batch APIs
|
||||||
%% * Must be a single element batch because wolff books calls, but not batch sizes
|
%% * Must be a single element batch because wolff books calls, but not batch sizes
|
||||||
%% for counters and gauges.
|
%% for counters and gauges.
|
||||||
|
@ -486,7 +563,9 @@ do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) ->
|
||||||
%% The retuned information is discarded here.
|
%% The retuned information is discarded here.
|
||||||
%% If the producer process is down when sending, this function would
|
%% If the producer process is down when sending, this function would
|
||||||
%% raise an error exception which is to be caught by the caller of this callback
|
%% raise an error exception which is to be caught by the caller of this callback
|
||||||
{_Partition, Pid} = wolff:send(Producers, Batch, {fun ?MODULE:on_kafka_ack/3, [AsyncReplyFn]}),
|
{_Partition, Pid} = wolff:send2(
|
||||||
|
Producers, KafkaTopic, Batch, {fun ?MODULE:on_kafka_ack/3, [AsyncReplyFn]}
|
||||||
|
),
|
||||||
%% this Pid is so far never used because Kafka producer is by-passing the buffer worker
|
%% this Pid is so far never used because Kafka producer is by-passing the buffer worker
|
||||||
{ok, Pid}.
|
{ok, Pid}.
|
||||||
|
|
||||||
|
@ -527,20 +606,24 @@ on_get_status(
|
||||||
end.
|
end.
|
||||||
|
|
||||||
on_get_channel_status(
|
on_get_channel_status(
|
||||||
_ResId,
|
_ConnResId,
|
||||||
ChannelId,
|
ActionResId,
|
||||||
#{
|
#{
|
||||||
client_id := ClientId,
|
client_id := ClientId,
|
||||||
installed_bridge_v2s := Channels
|
installed_bridge_v2s := Channels
|
||||||
} = _State
|
} = _ConnState
|
||||||
) ->
|
) ->
|
||||||
%% Note: we must avoid returning `?status_disconnected' here. Returning
|
%% Note: we must avoid returning `?status_disconnected' here. Returning
|
||||||
%% `?status_disconnected' will make resource manager try to restart the producers /
|
%% `?status_disconnected' will make resource manager try to restart the producers /
|
||||||
%% connector, thus potentially dropping data held in wolff producer's replayq. The
|
%% connector, thus potentially dropping data held in wolff producer's replayq. The
|
||||||
%% only exception is if the topic does not exist ("unhealthy target").
|
%% only exception is if the topic does not exist ("unhealthy target").
|
||||||
#{kafka_topic := KafkaTopic, partitions_limit := MaxPartitions} = maps:get(ChannelId, Channels),
|
#{
|
||||||
|
pre_configured_topics := PreConfiguredTopics,
|
||||||
|
partitions_limit := MaxPartitions
|
||||||
|
} = maps:get(ActionResId, Channels),
|
||||||
|
[KafkaTopic | _] = PreConfiguredTopics,
|
||||||
try
|
try
|
||||||
ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions),
|
ok = check_topic_and_leader_connections(ActionResId, ClientId, KafkaTopic, MaxPartitions),
|
||||||
?status_connected
|
?status_connected
|
||||||
catch
|
catch
|
||||||
throw:{unhealthy_target, Msg} ->
|
throw:{unhealthy_target, Msg} ->
|
||||||
|
@ -549,11 +632,11 @@ on_get_channel_status(
|
||||||
{?status_connecting, {K, E}}
|
{?status_connecting, {K, E}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions) ->
|
check_topic_and_leader_connections(ActionResId, ClientId, KafkaTopic, MaxPartitions) ->
|
||||||
case wolff_client_sup:find_client(ClientId) of
|
case wolff_client_sup:find_client(ClientId) of
|
||||||
{ok, Pid} ->
|
{ok, Pid} ->
|
||||||
ok = check_topic_status(ClientId, Pid, KafkaTopic),
|
ok = check_topic_status(ClientId, Pid, KafkaTopic),
|
||||||
ok = check_if_healthy_leaders(ClientId, Pid, KafkaTopic, MaxPartitions);
|
ok = check_if_healthy_leaders(ActionResId, ClientId, Pid, KafkaTopic, MaxPartitions);
|
||||||
{error, #{reason := no_such_client}} ->
|
{error, #{reason := no_such_client}} ->
|
||||||
throw(#{
|
throw(#{
|
||||||
reason => cannot_find_kafka_client,
|
reason => cannot_find_kafka_client,
|
||||||
|
@ -591,8 +674,10 @@ error_summary(Map, [Error]) ->
|
||||||
error_summary(Map, [Error | More]) ->
|
error_summary(Map, [Error | More]) ->
|
||||||
Map#{first_error => Error, total_errors => length(More) + 1}.
|
Map#{first_error => Error, total_errors => length(More) + 1}.
|
||||||
|
|
||||||
check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic, MaxPartitions) when is_pid(ClientPid) ->
|
check_if_healthy_leaders(ActionResId, ClientId, ClientPid, KafkaTopic, MaxPartitions) when
|
||||||
case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of
|
is_pid(ClientPid)
|
||||||
|
->
|
||||||
|
case wolff_client:get_leader_connections(ClientPid, ActionResId, KafkaTopic, MaxPartitions) of
|
||||||
{ok, Leaders} ->
|
{ok, Leaders} ->
|
||||||
%% Kafka is considered healthy as long as any of the partition leader is reachable.
|
%% Kafka is considered healthy as long as any of the partition leader is reachable.
|
||||||
case lists:partition(fun({_Partition, Pid}) -> is_alive(Pid) end, Leaders) of
|
case lists:partition(fun({_Partition, Pid}) -> is_alive(Pid) end, Leaders) of
|
||||||
|
@ -654,7 +739,7 @@ ssl(#{enable := true} = SSL) ->
|
||||||
ssl(_) ->
|
ssl(_) ->
|
||||||
false.
|
false.
|
||||||
|
|
||||||
producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) ->
|
producers_config(BridgeType, BridgeName, Input, IsDryRun, ActionResId) ->
|
||||||
#{
|
#{
|
||||||
max_batch_bytes := MaxBatchBytes,
|
max_batch_bytes := MaxBatchBytes,
|
||||||
compression := Compression,
|
compression := Compression,
|
||||||
|
@ -696,8 +781,8 @@ producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) ->
|
||||||
max_batch_bytes => MaxBatchBytes,
|
max_batch_bytes => MaxBatchBytes,
|
||||||
max_send_ahead => MaxInflight - 1,
|
max_send_ahead => MaxInflight - 1,
|
||||||
compression => Compression,
|
compression => Compression,
|
||||||
alias => BridgeV2Id,
|
group => ActionResId,
|
||||||
telemetry_meta_data => #{bridge_id => BridgeV2Id},
|
telemetry_meta_data => #{bridge_id => ActionResId},
|
||||||
max_partitions => MaxPartitions
|
max_partitions => MaxPartitions
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
@ -773,20 +858,19 @@ handle_telemetry_event(_EventId, _Metrics, _MetaData, _HandlerConfig) ->
|
||||||
%% Note: don't use the instance/manager ID, as that changes everytime
|
%% Note: don't use the instance/manager ID, as that changes everytime
|
||||||
%% the bridge is recreated, and will lead to multiplication of
|
%% the bridge is recreated, and will lead to multiplication of
|
||||||
%% metrics.
|
%% metrics.
|
||||||
-spec telemetry_handler_id(resource_id()) -> binary().
|
-spec telemetry_handler_id(action_resource_id()) -> binary().
|
||||||
telemetry_handler_id(ResourceID) ->
|
telemetry_handler_id(ActionResId) ->
|
||||||
<<"emqx-bridge-kafka-producer-", ResourceID/binary>>.
|
<<"emqx-bridge-kafka-producer-", ActionResId/binary>>.
|
||||||
|
|
||||||
uninstall_telemetry_handlers(ResourceID) ->
|
uninstall_telemetry_handlers(TelemetryId) ->
|
||||||
HandlerID = telemetry_handler_id(ResourceID),
|
telemetry:detach(TelemetryId).
|
||||||
telemetry:detach(HandlerID).
|
|
||||||
|
|
||||||
maybe_install_wolff_telemetry_handlers(ResourceID) ->
|
maybe_install_wolff_telemetry_handlers(TelemetryId) ->
|
||||||
%% Attach event handlers for Kafka telemetry events. If a handler with the
|
%% Attach event handlers for Kafka telemetry events. If a handler with the
|
||||||
%% handler id already exists, the attach_many function does nothing
|
%% handler id already exists, the attach_many function does nothing
|
||||||
telemetry:attach_many(
|
telemetry:attach_many(
|
||||||
%% unique handler id
|
%% unique handler id
|
||||||
telemetry_handler_id(ResourceID),
|
telemetry_handler_id(TelemetryId),
|
||||||
[
|
[
|
||||||
[wolff, dropped_queue_full],
|
[wolff, dropped_queue_full],
|
||||||
[wolff, queuing],
|
[wolff, queuing],
|
||||||
|
@ -798,7 +882,7 @@ maybe_install_wolff_telemetry_handlers(ResourceID) ->
|
||||||
%% wolff producers; otherwise, multiple kafka producer bridges
|
%% wolff producers; otherwise, multiple kafka producer bridges
|
||||||
%% will install multiple handlers to the same wolff events,
|
%% will install multiple handlers to the same wolff events,
|
||||||
%% multiplying the metric counts...
|
%% multiplying the metric counts...
|
||||||
#{bridge_id => ResourceID}
|
#{bridge_id => TelemetryId}
|
||||||
).
|
).
|
||||||
|
|
||||||
preproc_kafka_headers(HeadersTmpl) when HeadersTmpl =:= <<>>; HeadersTmpl =:= undefined ->
|
preproc_kafka_headers(HeadersTmpl) when HeadersTmpl =:= <<>>; HeadersTmpl =:= undefined ->
|
||||||
|
|
|
@ -26,7 +26,12 @@ schema_module() -> emqx_bridge_kafka.
|
||||||
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
|
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
|
||||||
BridgeV1Config1 = maps:remove(<<"connector">>, ActionConfig),
|
BridgeV1Config1 = maps:remove(<<"connector">>, ActionConfig),
|
||||||
BridgeV1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config1),
|
BridgeV1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config1),
|
||||||
emqx_utils_maps:rename(<<"parameters">>, <<"kafka">>, BridgeV1Config2).
|
BridgeV1Config = emqx_utils_maps:rename(<<"parameters">>, <<"kafka">>, BridgeV1Config2),
|
||||||
|
maps:update_with(
|
||||||
|
<<"kafka">>,
|
||||||
|
fun(Params) -> maps:with(v1_parameters(), Params) end,
|
||||||
|
BridgeV1Config
|
||||||
|
).
|
||||||
|
|
||||||
bridge_v1_config_to_action_config(BridgeV1Conf0 = #{<<"producer">> := _}, ConnectorName) ->
|
bridge_v1_config_to_action_config(BridgeV1Conf0 = #{<<"producer">> := _}, ConnectorName) ->
|
||||||
%% Ancient v1 config, when `kafka' key was wrapped by `producer'
|
%% Ancient v1 config, when `kafka' key was wrapped by `producer'
|
||||||
|
@ -51,6 +56,12 @@ bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
|
||||||
%% Internal helper functions
|
%% Internal helper functions
|
||||||
%%------------------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
v1_parameters() ->
|
||||||
|
[
|
||||||
|
to_bin(K)
|
||||||
|
|| {K, _} <- emqx_bridge_kafka:fields(v1_producer_kafka_opts)
|
||||||
|
].
|
||||||
|
|
||||||
producer_action_field_keys() ->
|
producer_action_field_keys() ->
|
||||||
[
|
[
|
||||||
to_bin(K)
|
to_bin(K)
|
||||||
|
|
|
@ -23,6 +23,7 @@
|
||||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
-include_lib("brod/include/brod.hrl").
|
-include_lib("brod/include/brod.hrl").
|
||||||
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
||||||
|
-include_lib("emqx/include/asserts.hrl").
|
||||||
|
|
||||||
-import(emqx_common_test_helpers, [on_exit/1]).
|
-import(emqx_common_test_helpers, [on_exit/1]).
|
||||||
|
|
||||||
|
@ -165,6 +166,9 @@ send_message(Type, ActionName) ->
|
||||||
|
|
||||||
resolve_kafka_offset() ->
|
resolve_kafka_offset() ->
|
||||||
KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
|
KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
|
||||||
|
resolve_kafka_offset(KafkaTopic).
|
||||||
|
|
||||||
|
resolve_kafka_offset(KafkaTopic) ->
|
||||||
Partition = 0,
|
Partition = 0,
|
||||||
Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
|
Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
|
||||||
{ok, Offset0} = emqx_bridge_kafka_impl_producer_SUITE:resolve_kafka_offset(
|
{ok, Offset0} = emqx_bridge_kafka_impl_producer_SUITE:resolve_kafka_offset(
|
||||||
|
@ -174,11 +178,32 @@ resolve_kafka_offset() ->
|
||||||
|
|
||||||
check_kafka_message_payload(Offset, ExpectedPayload) ->
|
check_kafka_message_payload(Offset, ExpectedPayload) ->
|
||||||
KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
|
KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
|
||||||
|
check_kafka_message_payload(KafkaTopic, Offset, ExpectedPayload).
|
||||||
|
|
||||||
|
check_kafka_message_payload(KafkaTopic, Offset, ExpectedPayload) ->
|
||||||
Partition = 0,
|
Partition = 0,
|
||||||
Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
|
Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
|
||||||
{ok, {_, [KafkaMsg0]}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset),
|
{ok, {_, [KafkaMsg0]}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset),
|
||||||
?assertMatch(#kafka_message{value = ExpectedPayload}, KafkaMsg0).
|
?assertMatch(#kafka_message{value = ExpectedPayload}, KafkaMsg0).
|
||||||
|
|
||||||
|
ensure_kafka_topic(KafkaTopic) ->
|
||||||
|
TopicConfigs = [
|
||||||
|
#{
|
||||||
|
name => KafkaTopic,
|
||||||
|
num_partitions => 1,
|
||||||
|
replication_factor => 1,
|
||||||
|
assignments => [],
|
||||||
|
configs => []
|
||||||
|
}
|
||||||
|
],
|
||||||
|
RequestConfig = #{timeout => 5_000},
|
||||||
|
ConnConfig = #{},
|
||||||
|
Endpoints = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
|
||||||
|
case brod:create_topics(Endpoints, TopicConfigs, RequestConfig, ConnConfig) of
|
||||||
|
ok -> ok;
|
||||||
|
{error, topic_already_exists} -> ok
|
||||||
|
end.
|
||||||
|
|
||||||
action_config(ConnectorName) ->
|
action_config(ConnectorName) ->
|
||||||
action_config(ConnectorName, _Overrides = #{}).
|
action_config(ConnectorName, _Overrides = #{}).
|
||||||
|
|
||||||
|
@ -728,9 +753,13 @@ t_invalid_partition_count_metrics(Config) ->
|
||||||
%% Simulate `invalid_partition_count'
|
%% Simulate `invalid_partition_count'
|
||||||
emqx_common_test_helpers:with_mock(
|
emqx_common_test_helpers:with_mock(
|
||||||
wolff,
|
wolff,
|
||||||
send_sync,
|
send_sync2,
|
||||||
fun(_Producers, _Msgs, _Timeout) ->
|
fun(_Producers, _Topic, _Msgs, _Timeout) ->
|
||||||
error({invalid_partition_count, 0, partitioner})
|
throw(#{
|
||||||
|
cause => invalid_partition_count,
|
||||||
|
count => 0,
|
||||||
|
partitioner => partitioner
|
||||||
|
})
|
||||||
end,
|
end,
|
||||||
fun() ->
|
fun() ->
|
||||||
{{ok, _}, {ok, _}} =
|
{{ok, _}, {ok, _}} =
|
||||||
|
@ -773,9 +802,13 @@ t_invalid_partition_count_metrics(Config) ->
|
||||||
%% Simulate `invalid_partition_count'
|
%% Simulate `invalid_partition_count'
|
||||||
emqx_common_test_helpers:with_mock(
|
emqx_common_test_helpers:with_mock(
|
||||||
wolff,
|
wolff,
|
||||||
send,
|
send2,
|
||||||
fun(_Producers, _Msgs, _Timeout) ->
|
fun(_Producers, _Topic, _Msgs, _AckCallback) ->
|
||||||
error({invalid_partition_count, 0, partitioner})
|
throw(#{
|
||||||
|
cause => invalid_partition_count,
|
||||||
|
count => 0,
|
||||||
|
partitioner => partitioner
|
||||||
|
})
|
||||||
end,
|
end,
|
||||||
fun() ->
|
fun() ->
|
||||||
{{ok, _}, {ok, _}} =
|
{{ok, _}, {ok, _}} =
|
||||||
|
@ -881,3 +914,196 @@ t_multiple_actions_sharing_topic(Config) ->
|
||||||
end
|
end
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
%% Smoke tests for using a templated topic and a list of pre-configured kafka topics.
|
||||||
|
t_pre_configured_topics(Config) ->
|
||||||
|
Type = proplists:get_value(type, Config, ?TYPE),
|
||||||
|
ConnectorName = proplists:get_value(connector_name, Config, <<"c">>),
|
||||||
|
ConnectorConfig = proplists:get_value(connector_config, Config, connector_config()),
|
||||||
|
ActionName = <<"pre_configured_topics">>,
|
||||||
|
ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)),
|
||||||
|
PreConfigureTopic1 = <<"pct1">>,
|
||||||
|
PreConfigureTopic2 = <<"pct2">>,
|
||||||
|
ensure_kafka_topic(PreConfigureTopic1),
|
||||||
|
ensure_kafka_topic(PreConfigureTopic2),
|
||||||
|
ActionConfig = emqx_bridge_v2_testlib:parse_and_check(
|
||||||
|
action,
|
||||||
|
Type,
|
||||||
|
ActionName,
|
||||||
|
emqx_utils_maps:deep_merge(
|
||||||
|
ActionConfig1,
|
||||||
|
#{
|
||||||
|
<<"parameters">> => #{
|
||||||
|
<<"topic">> => <<"pct${.payload.n}">>,
|
||||||
|
<<"message">> => #{
|
||||||
|
<<"key">> => <<"${.clientid}">>,
|
||||||
|
<<"value">> => <<"${.payload.p}">>
|
||||||
|
},
|
||||||
|
<<"pre_configured_topics">> => [
|
||||||
|
#{<<"topic">> => PreConfigureTopic1},
|
||||||
|
#{<<"topic">> => PreConfigureTopic2}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
),
|
||||||
|
?check_trace(
|
||||||
|
#{timetrap => 7_000},
|
||||||
|
begin
|
||||||
|
ConnectorParams = [
|
||||||
|
{connector_config, ConnectorConfig},
|
||||||
|
{connector_name, ConnectorName},
|
||||||
|
{connector_type, Type}
|
||||||
|
],
|
||||||
|
ActionParams = [
|
||||||
|
{action_config, ActionConfig},
|
||||||
|
{action_name, ActionName},
|
||||||
|
{action_type, Type}
|
||||||
|
],
|
||||||
|
{ok, {{_, 201, _}, _, #{}}} =
|
||||||
|
emqx_bridge_v2_testlib:create_connector_api(ConnectorParams),
|
||||||
|
|
||||||
|
{ok, {{_, 201, _}, _, #{}}} =
|
||||||
|
emqx_bridge_v2_testlib:create_action_api(ActionParams),
|
||||||
|
RuleTopic = <<"pct">>,
|
||||||
|
{ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http(
|
||||||
|
Type,
|
||||||
|
RuleTopic,
|
||||||
|
[
|
||||||
|
{bridge_name, ActionName}
|
||||||
|
],
|
||||||
|
#{
|
||||||
|
sql =>
|
||||||
|
<<"select *, json_decode(payload) as payload from \"", RuleTopic/binary,
|
||||||
|
"\" ">>
|
||||||
|
}
|
||||||
|
),
|
||||||
|
?assertStatusAPI(Type, ActionName, <<"connected">>),
|
||||||
|
|
||||||
|
HandlerId = ?FUNCTION_NAME,
|
||||||
|
TestPid = self(),
|
||||||
|
telemetry:attach_many(
|
||||||
|
HandlerId,
|
||||||
|
emqx_resource_metrics:events(),
|
||||||
|
fun(EventName, Measurements, Metadata, _Config) ->
|
||||||
|
Data = #{
|
||||||
|
name => EventName,
|
||||||
|
measurements => Measurements,
|
||||||
|
metadata => Metadata
|
||||||
|
},
|
||||||
|
TestPid ! {telemetry, Data},
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
unused_config
|
||||||
|
),
|
||||||
|
on_exit(fun() -> telemetry:detach(HandlerId) end),
|
||||||
|
|
||||||
|
{ok, C} = emqtt:start_link(#{}),
|
||||||
|
{ok, _} = emqtt:connect(C),
|
||||||
|
Payload = fun(Map) -> emqx_utils_json:encode(Map) end,
|
||||||
|
Offset1 = resolve_kafka_offset(PreConfigureTopic1),
|
||||||
|
Offset2 = resolve_kafka_offset(PreConfigureTopic2),
|
||||||
|
{ok, _} = emqtt:publish(C, RuleTopic, Payload(#{n => 1, p => <<"p1">>}), [{qos, 1}]),
|
||||||
|
{ok, _} = emqtt:publish(C, RuleTopic, Payload(#{n => 2, p => <<"p2">>}), [{qos, 1}]),
|
||||||
|
|
||||||
|
check_kafka_message_payload(PreConfigureTopic1, Offset1, <<"p1">>),
|
||||||
|
check_kafka_message_payload(PreConfigureTopic2, Offset2, <<"p2">>),
|
||||||
|
|
||||||
|
ActionId = emqx_bridge_v2:id(Type, ActionName),
|
||||||
|
?assertEqual(2, emqx_resource_metrics:matched_get(ActionId)),
|
||||||
|
?assertEqual(2, emqx_resource_metrics:success_get(ActionId)),
|
||||||
|
?assertEqual(0, emqx_resource_metrics:queuing_get(ActionId)),
|
||||||
|
|
||||||
|
?assertReceive(
|
||||||
|
{telemetry, #{
|
||||||
|
measurements := #{gauge_set := _},
|
||||||
|
metadata := #{worker_id := _, resource_id := ActionId}
|
||||||
|
}}
|
||||||
|
),
|
||||||
|
|
||||||
|
%% If there isn't enough information in the context to resolve to a topic, it
|
||||||
|
%% should be an unrecoverable error.
|
||||||
|
?assertMatch(
|
||||||
|
{_, {ok, _}},
|
||||||
|
?wait_async_action(
|
||||||
|
emqtt:publish(C, RuleTopic, Payload(#{not_enough => <<"info">>}), [{qos, 1}]),
|
||||||
|
#{?snk_kind := "kafka_producer_failed_to_render_topic"}
|
||||||
|
)
|
||||||
|
),
|
||||||
|
|
||||||
|
%% If it's possible to render the topic, but it isn't in the pre-configured
|
||||||
|
%% list, it should be an unrecoverable error.
|
||||||
|
?assertMatch(
|
||||||
|
{_, {ok, _}},
|
||||||
|
?wait_async_action(
|
||||||
|
emqtt:publish(C, RuleTopic, Payload(#{n => 99}), [{qos, 1}]),
|
||||||
|
#{?snk_kind := "kafka_producer_resolved_to_unknown_topic"}
|
||||||
|
)
|
||||||
|
),
|
||||||
|
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
[]
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%% Checks that creating an action with templated topic and no pre-configured kafka topics
|
||||||
|
%% throws.
|
||||||
|
t_templated_topic_and_no_pre_configured_topics(Config) ->
|
||||||
|
Type = proplists:get_value(type, Config, ?TYPE),
|
||||||
|
ConnectorName = proplists:get_value(connector_name, Config, <<"c">>),
|
||||||
|
ConnectorConfig = proplists:get_value(connector_config, Config, connector_config()),
|
||||||
|
ActionName = <<"bad_pre_configured_topics">>,
|
||||||
|
ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)),
|
||||||
|
ActionConfig = emqx_bridge_v2_testlib:parse_and_check(
|
||||||
|
action,
|
||||||
|
Type,
|
||||||
|
ActionName,
|
||||||
|
emqx_utils_maps:deep_merge(
|
||||||
|
ActionConfig1,
|
||||||
|
#{
|
||||||
|
<<"parameters">> => #{
|
||||||
|
<<"topic">> => <<"pct${.payload.n}">>,
|
||||||
|
<<"pre_configured_topics">> => []
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
),
|
||||||
|
?check_trace(
|
||||||
|
#{timetrap => 7_000},
|
||||||
|
begin
|
||||||
|
ConnectorParams = [
|
||||||
|
{connector_config, ConnectorConfig},
|
||||||
|
{connector_name, ConnectorName},
|
||||||
|
{connector_type, Type}
|
||||||
|
],
|
||||||
|
ActionParams = [
|
||||||
|
{action_config, ActionConfig},
|
||||||
|
{action_name, ActionName},
|
||||||
|
{action_type, Type}
|
||||||
|
],
|
||||||
|
{ok, {{_, 201, _}, _, #{}}} =
|
||||||
|
emqx_bridge_v2_testlib:create_connector_api(ConnectorParams),
|
||||||
|
|
||||||
|
{ok, {{_, 201, _}, _, #{}}} =
|
||||||
|
emqx_bridge_v2_testlib:create_action_api(ActionParams),
|
||||||
|
|
||||||
|
?assertMatch(
|
||||||
|
{ok,
|
||||||
|
{{_, 200, _}, _, #{
|
||||||
|
<<"status_reason">> :=
|
||||||
|
<<
|
||||||
|
"Either the Kafka topic must be fixed (not a template),"
|
||||||
|
" or at least one pre-defined topic must be set."
|
||||||
|
>>,
|
||||||
|
<<"status">> := <<"disconnected">>,
|
||||||
|
<<"node_status">> := [#{<<"status">> := <<"disconnected">>}]
|
||||||
|
}}},
|
||||||
|
emqx_bridge_v2_testlib:get_bridge_api(Type, ActionName)
|
||||||
|
),
|
||||||
|
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
[]
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Added to possibility to configure a list of predefined Kafka topics to Kafka producer actions, and also to use templates to define the destination Kafka topic.
|
|
@ -69,7 +69,7 @@ producer_kafka_opts.label:
|
||||||
"""Azure Event Hubs Producer"""
|
"""Azure Event Hubs Producer"""
|
||||||
|
|
||||||
kafka_topic.desc:
|
kafka_topic.desc:
|
||||||
"""Event Hubs name"""
|
"""Event Hubs name. Supports templates (e.g.: `t-${payload.t}`)."""
|
||||||
|
|
||||||
kafka_topic.label:
|
kafka_topic.label:
|
||||||
"""Event Hubs Name"""
|
"""Event Hubs Name"""
|
||||||
|
@ -350,4 +350,14 @@ Setting this to a value which is greater than the total number of partitions in
|
||||||
partitions_limit.label:
|
partitions_limit.label:
|
||||||
"""Max Partitions"""
|
"""Max Partitions"""
|
||||||
|
|
||||||
|
producer_pre_configured_topics.label:
|
||||||
|
"""Pre-configured Event Hubs"""
|
||||||
|
producer_pre_configured_topics.desc:
|
||||||
|
"""A list of pre-configured event hubs to be used when using templates to define outgoing topics. If the topic template fails to resolve to a value due to missing data in the incoming message, or if it resolves to a topic that is not contained in this list, then publishing will fail."""
|
||||||
|
|
||||||
|
pre_configured_topic.label:
|
||||||
|
"""Event Hubs Name"""
|
||||||
|
pre_configured_topic.desc:
|
||||||
|
"""Event Hubs name"""
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -69,10 +69,10 @@ producer_kafka_opts.label:
|
||||||
"""Confluent Producer"""
|
"""Confluent Producer"""
|
||||||
|
|
||||||
kafka_topic.desc:
|
kafka_topic.desc:
|
||||||
"""Event Hub name"""
|
"""Kafka topic name. Supports templates (e.g.: `t-${payload.t}`)."""
|
||||||
|
|
||||||
kafka_topic.label:
|
kafka_topic.label:
|
||||||
"""Event Hub Name"""
|
"""Kafka Topic Name"""
|
||||||
|
|
||||||
kafka_message_timestamp.desc:
|
kafka_message_timestamp.desc:
|
||||||
"""Which timestamp to use. The timestamp is expected to be a millisecond precision Unix epoch which can be in string format, e.g. <code>1661326462115</code> or <code>'1661326462115'</code>. When the desired data field for this template is not found, or if the found data is not a valid integer, the current system timestamp will be used."""
|
"""Which timestamp to use. The timestamp is expected to be a millisecond precision Unix epoch which can be in string format, e.g. <code>1661326462115</code> or <code>'1661326462115'</code>. When the desired data field for this template is not found, or if the found data is not a valid integer, the current system timestamp will be used."""
|
||||||
|
@ -350,4 +350,14 @@ server_name_indication.desc:
|
||||||
server_name_indication.label:
|
server_name_indication.label:
|
||||||
"""SNI"""
|
"""SNI"""
|
||||||
|
|
||||||
|
producer_pre_configured_topics.label:
|
||||||
|
"""Pre-configured Topics"""
|
||||||
|
producer_pre_configured_topics.desc:
|
||||||
|
"""A list of pre-configured topics to be used when using templates to define outgoing topics. If the topic template fails to resolve to a value due to missing data in the incoming message, or if it resolves to a topic that is not contained in this list, then publishing will fail."""
|
||||||
|
|
||||||
|
pre_configured_topic.label:
|
||||||
|
"""Kafka Topic Name"""
|
||||||
|
pre_configured_topic.desc:
|
||||||
|
"""Kafka topic name"""
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -81,7 +81,7 @@ producer_kafka_opts.label:
|
||||||
"""Kafka Producer"""
|
"""Kafka Producer"""
|
||||||
|
|
||||||
kafka_topic.desc:
|
kafka_topic.desc:
|
||||||
"""Kafka topic name"""
|
"""Kafka topic name. Supports templates (e.g.: `t-${payload.t}`)."""
|
||||||
|
|
||||||
kafka_topic.label:
|
kafka_topic.label:
|
||||||
"""Kafka Topic Name"""
|
"""Kafka Topic Name"""
|
||||||
|
@ -446,5 +446,14 @@ server_name_indication.desc:
|
||||||
server_name_indication.label:
|
server_name_indication.label:
|
||||||
"""SNI"""
|
"""SNI"""
|
||||||
|
|
||||||
|
producer_pre_configured_topics.label:
|
||||||
|
"""Pre-configured Topics"""
|
||||||
|
producer_pre_configured_topics.desc:
|
||||||
|
"""A list of pre-configured topics to be used when using templates to define outgoing topics. If the topic template fails to resolve to a value due to missing data in the incoming message, or if it resolves to a topic that is not contained in this list, then publishing will fail."""
|
||||||
|
|
||||||
|
pre_configured_topic.label:
|
||||||
|
"""Kafka Topic Name"""
|
||||||
|
pre_configured_topic.desc:
|
||||||
|
"""Kafka topic name"""
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue