diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index eee0cdd99..b5799a91a 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -204,6 +204,7 @@ on_query( query_mode_sync_timeout := SyncTimeout } ) -> + ?tp(emqx_bridge_kafka_impl_producer_sync_query, #{}), KafkaMessage = render_message(Template, Message), try {_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], SyncTimeout), @@ -227,6 +228,7 @@ on_query_async( AsyncReplyFn, #{message_template := Template, producers := Producers} ) -> + ?tp(emqx_bridge_kafka_impl_producer_async_query, #{}), KafkaMessage = render_message(Template, Message), %% * Must be a batch because wolff:send and wolff:send_sync are batch APIs %% * Must be a single element batch because wolff books calls, but not batch sizes diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index 378dda543..e52f5b07b 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -120,6 +120,34 @@ set_special_configs(emqx_dashboard) -> ok; set_special_configs(_) -> ok. + +%%------------------------------------------------------------------------------ +%% Test case for the query_mode parameter +%%------------------------------------------------------------------------------ + +t_query_mode(CtConfig) -> + %% We need this because on_query_async is in a different group + CtConfig1 = [{query_api, none} | CtConfig], + ?check_trace( + begin + publish_with_config_template_parameters(CtConfig1, #{"query_mode" => "sync"}) + end, + fun(RunStageResult, Trace) -> + %% We should have a sync Snabbkaffe trace + ?assertMatch([_], ?of_kind(emqx_bridge_kafka_impl_producer_sync_query, Trace)) + end + ), + ?check_trace( + begin + publish_with_config_template_parameters(CtConfig1, #{"query_mode" => "async"}) + end, + fun(RunStageResult, Trace) -> + %% We should have a sync Snabbkaffe trace + ?assertMatch([_], ?of_kind(emqx_bridge_kafka_impl_producer_async_query, Trace)) + end + ), + ok. + %%------------------------------------------------------------------------------ %% Test cases for all combinations of SSL, no SSL and authentication types %%------------------------------------------------------------------------------ @@ -473,6 +501,16 @@ do_send(Ref, Config, ResourceId, Msg, State) when is_list(Config) -> ok end. +publish_with_config_template_parameters(CtConfig, ConfigTemplateParameters) -> + publish_helper( + CtConfig, + #{ + auth_settings => "none", + ssl_settings => #{} + }, + ConfigTemplateParameters + ). + publish_with_and_without_ssl(CtConfig, AuthSettings) -> publish_with_and_without_ssl(CtConfig, AuthSettings, #{}). @@ -537,21 +575,25 @@ publish_helper( {ok, _} = emqx_bridge:create( <>, list_to_binary(Name), Conf ), - Time = erlang:unique_integer(), - BinTime = integer_to_binary(Time), Partition = 0, - Msg = #{ - clientid => BinTime, - payload => <<"payload">>, - timestamp => Time - }, - {ok, Offset0} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, Partition), - ct:pal("base offset before testing ~p", [Offset0]), - {ok, _Group, #{state := State}} = emqx_resource:get_instance(InstId), - ok = send(CtConfig, InstId, Msg, State), - {ok, {_, [KafkaMsg0]}} = brod:fetch(kafka_hosts(), KafkaTopic, Partition, Offset0), - ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg0), - + case proplists:get_value(query_api, CtConfig) of + none -> + ok; + _ -> + Time = erlang:unique_integer(), + BinTime = integer_to_binary(Time), + Msg = #{ + clientid => BinTime, + payload => <<"payload">>, + timestamp => Time + }, + {ok, Offset0} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, Partition), + ct:pal("base offset before testing ~p", [Offset0]), + {ok, _Group, #{state := State}} = emqx_resource:get_instance(InstId), + ok = send(CtConfig, InstId, Msg, State), + {ok, {_, [KafkaMsg0]}} = brod:fetch(kafka_hosts(), KafkaTopic, Partition, Offset0), + ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg0) + end, %% test that it forwards from local mqtt topic as well {ok, Offset1} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, Partition), ct:pal("base offset before testing (2) ~p", [Offset1]), @@ -596,13 +638,15 @@ hocon_config(Args) -> AuthConfRendered = bbmustache:render(AuthTemplate, AuthConf), SSLConf = maps:get("ssl", Args, #{}), SSLTemplate = iolist_to_binary(hocon_config_template_ssl(SSLConf)), + QueryMode = maps:get("query_mode", Args, <<"async">>), SSLConfRendered = bbmustache:render(SSLTemplate, SSLConf), Hocon = bbmustache:render( iolist_to_binary(hocon_config_template()), Args#{ "authentication" => AuthConfRendered, "bridge_name" => Name, - "ssl" => SSLConfRendered + "ssl" => SSLConfRendered, + "query_mode" => QueryMode } ), Hocon. @@ -630,6 +674,7 @@ bridges.kafka.{{ bridge_name }} { } partition_strategy = {{ partition_strategy }} topic = \"{{ kafka_topic }}\" + query_mode = {{ query_mode }} } metadata_request_timeout = 5s min_metadata_refresh_interval = 3s diff --git a/rel/i18n/emqx_bridge_kafka.hocon b/rel/i18n/emqx_bridge_kafka.hocon index 740d94984..86acd81fe 100644 --- a/rel/i18n/emqx_bridge_kafka.hocon +++ b/rel/i18n/emqx_bridge_kafka.hocon @@ -364,18 +364,6 @@ query_mode.desc: query_mode.label: """Query mode""" -resource_opts.desc: -"""Resource options.""" - -resource_opts.label: -"""Resource Options""" - -resource_opts_fields.desc: -"""Resource options.""" - -resource_opts_fields.label: -"""Resource Options""" - query_mode_sync_timeout.desc: """This parameter defines the timeout limit for synchronous queries. It applies only when the bridge query mode is configured to 'sync'."""