test: add Kafka producer bridge test case for query_mode parameter

This commit is contained in:
Kjell Winblad 2023-06-08 15:43:41 +02:00
parent ed9e29e769
commit 6f2271e9f0
3 changed files with 62 additions and 27 deletions

View File

@ -204,6 +204,7 @@ on_query(
query_mode_sync_timeout := SyncTimeout
}
) ->
?tp(emqx_bridge_kafka_impl_producer_sync_query, #{}),
KafkaMessage = render_message(Template, Message),
try
{_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], SyncTimeout),
@ -227,6 +228,7 @@ on_query_async(
AsyncReplyFn,
#{message_template := Template, producers := Producers}
) ->
?tp(emqx_bridge_kafka_impl_producer_async_query, #{}),
KafkaMessage = render_message(Template, Message),
%% * Must be a batch because wolff:send and wolff:send_sync are batch APIs
%% * Must be a single element batch because wolff books calls, but not batch sizes

View File

@ -120,6 +120,34 @@ set_special_configs(emqx_dashboard) ->
ok;
set_special_configs(_) ->
ok.
%%------------------------------------------------------------------------------
%% Test case for the query_mode parameter
%%------------------------------------------------------------------------------
t_query_mode(CtConfig) ->
%% We need this because on_query_async is in a different group
CtConfig1 = [{query_api, none} | CtConfig],
?check_trace(
begin
publish_with_config_template_parameters(CtConfig1, #{"query_mode" => "sync"})
end,
fun(RunStageResult, Trace) ->
%% We should have a sync Snabbkaffe trace
?assertMatch([_], ?of_kind(emqx_bridge_kafka_impl_producer_sync_query, Trace))
end
),
?check_trace(
begin
publish_with_config_template_parameters(CtConfig1, #{"query_mode" => "async"})
end,
fun(RunStageResult, Trace) ->
%% We should have a sync Snabbkaffe trace
?assertMatch([_], ?of_kind(emqx_bridge_kafka_impl_producer_async_query, Trace))
end
),
ok.
%%------------------------------------------------------------------------------
%% Test cases for all combinations of SSL, no SSL and authentication types
%%------------------------------------------------------------------------------
@ -473,6 +501,16 @@ do_send(Ref, Config, ResourceId, Msg, State) when is_list(Config) ->
ok
end.
publish_with_config_template_parameters(CtConfig, ConfigTemplateParameters) ->
publish_helper(
CtConfig,
#{
auth_settings => "none",
ssl_settings => #{}
},
ConfigTemplateParameters
).
publish_with_and_without_ssl(CtConfig, AuthSettings) ->
publish_with_and_without_ssl(CtConfig, AuthSettings, #{}).
@ -537,21 +575,25 @@ publish_helper(
{ok, _} = emqx_bridge:create(
<<?BRIDGE_TYPE>>, list_to_binary(Name), Conf
),
Time = erlang:unique_integer(),
BinTime = integer_to_binary(Time),
Partition = 0,
Msg = #{
clientid => BinTime,
payload => <<"payload">>,
timestamp => Time
},
{ok, Offset0} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, Partition),
ct:pal("base offset before testing ~p", [Offset0]),
{ok, _Group, #{state := State}} = emqx_resource:get_instance(InstId),
ok = send(CtConfig, InstId, Msg, State),
{ok, {_, [KafkaMsg0]}} = brod:fetch(kafka_hosts(), KafkaTopic, Partition, Offset0),
?assertMatch(#kafka_message{key = BinTime}, KafkaMsg0),
case proplists:get_value(query_api, CtConfig) of
none ->
ok;
_ ->
Time = erlang:unique_integer(),
BinTime = integer_to_binary(Time),
Msg = #{
clientid => BinTime,
payload => <<"payload">>,
timestamp => Time
},
{ok, Offset0} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, Partition),
ct:pal("base offset before testing ~p", [Offset0]),
{ok, _Group, #{state := State}} = emqx_resource:get_instance(InstId),
ok = send(CtConfig, InstId, Msg, State),
{ok, {_, [KafkaMsg0]}} = brod:fetch(kafka_hosts(), KafkaTopic, Partition, Offset0),
?assertMatch(#kafka_message{key = BinTime}, KafkaMsg0)
end,
%% test that it forwards from local mqtt topic as well
{ok, Offset1} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, Partition),
ct:pal("base offset before testing (2) ~p", [Offset1]),
@ -596,13 +638,15 @@ hocon_config(Args) ->
AuthConfRendered = bbmustache:render(AuthTemplate, AuthConf),
SSLConf = maps:get("ssl", Args, #{}),
SSLTemplate = iolist_to_binary(hocon_config_template_ssl(SSLConf)),
QueryMode = maps:get("query_mode", Args, <<"async">>),
SSLConfRendered = bbmustache:render(SSLTemplate, SSLConf),
Hocon = bbmustache:render(
iolist_to_binary(hocon_config_template()),
Args#{
"authentication" => AuthConfRendered,
"bridge_name" => Name,
"ssl" => SSLConfRendered
"ssl" => SSLConfRendered,
"query_mode" => QueryMode
}
),
Hocon.
@ -630,6 +674,7 @@ bridges.kafka.{{ bridge_name }} {
}
partition_strategy = {{ partition_strategy }}
topic = \"{{ kafka_topic }}\"
query_mode = {{ query_mode }}
}
metadata_request_timeout = 5s
min_metadata_refresh_interval = 3s

View File

@ -364,18 +364,6 @@ query_mode.desc:
query_mode.label:
"""Query mode"""
resource_opts.desc:
"""Resource options."""
resource_opts.label:
"""Resource Options"""
resource_opts_fields.desc:
"""Resource options."""
resource_opts_fields.label:
"""Resource Options"""
query_mode_sync_timeout.desc:
"""This parameter defines the timeout limit for synchronous queries. It applies only when the bridge query mode is configured to 'sync'."""