From 0381ac0410bdcb41b139dfeb26cd66cc6cd5ce17 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 24 Aug 2023 10:10:50 -0300 Subject: [PATCH 1/2] fix(kafka_producer): use correct timestamp template field Fixes https://emqx.atlassian.net/browse/EMQX-10847 --- .../src/emqx_bridge_kafka_impl_producer.erl | 2 +- .../emqx_bridge_kafka_impl_producer_SUITE.erl | 46 ++++++++++++++++++- changes/ee/fix-11513.en.md | 1 + 3 files changed, 47 insertions(+), 2 deletions(-) create mode 100644 changes/ee/fix-11513.en.md diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index ea6666ea0..d8c934e6b 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -286,7 +286,7 @@ on_query_async( compile_message_template(T) -> KeyTemplate = maps:get(key, T, <<"${.clientid}">>), ValueTemplate = maps:get(value, T, <<"${.}">>), - TimestampTemplate = maps:get(value, T, <<"${.timestamp}">>), + TimestampTemplate = maps:get(timestamp, T, <<"${.timestamp}">>), #{ key => preproc_tmpl(KeyTemplate), value => preproc_tmpl(ValueTemplate), diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index d93b6dd7d..b920b39ae 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -470,7 +470,51 @@ t_failed_creation_then_fix(Config) -> delete_all_bridges(), ok. -t_table_removed(_Config) -> +t_custom_timestamp(_Config) -> + HostsString = kafka_hosts_string_sasl(), + AuthSettings = valid_sasl_plain_settings(), + Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]), + Type = ?BRIDGE_TYPE, + Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), + ResourceId = emqx_bridge_resource:resource_id(Type, Name), + KafkaTopic = "test-topic-one-partition", + MQTTTopic = <<"t/local/kafka">>, + emqx:subscribe(MQTTTopic), + Conf0 = config(#{ + "authentication" => AuthSettings, + "kafka_hosts_string" => HostsString, + "local_topic" => MQTTTopic, + "kafka_topic" => KafkaTopic, + "instance_id" => ResourceId, + "ssl" => #{} + }), + Conf = emqx_utils_maps:deep_put( + [<<"kafka">>, <<"message">>, <<"timestamp">>], + Conf0, + <<"123">> + ), + {ok, _} = emqx_bridge:create(Type, erlang:list_to_atom(Name), Conf), + {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0), + ct:pal("base offset before testing ~p", [Offset]), + Time = erlang:unique_integer(), + BinTime = integer_to_binary(Time), + Msg = #{ + clientid => BinTime, + payload => <<"payload">>, + timestamp => Time + }, + emqx:publish(emqx_message:make(MQTTTopic, emqx_utils_json:encode(Msg))), + {ok, {_, [KafkaMsg]}} = + ?retry( + _Interval = 500, + _NAttempts = 20, + {ok, {_, [_]}} = brod:fetch(kafka_hosts(), KafkaTopic, _Partition = 0, Offset) + ), + ?assertMatch(#kafka_message{ts = 123, ts_type = create}, KafkaMsg), + delete_all_bridges(), + ok. + +t_nonexistent_topic(_Config) -> HostsString = kafka_hosts_string_sasl(), AuthSettings = valid_sasl_plain_settings(), Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]), diff --git a/changes/ee/fix-11513.en.md b/changes/ee/fix-11513.en.md new file mode 100644 index 000000000..51d953933 --- /dev/null +++ b/changes/ee/fix-11513.en.md @@ -0,0 +1 @@ +Fixed a bug which prevented the Kafka Producer bridge from using the correct template for the `timestamp` field. From 016ae0524f79278d039789f1e93a9749cef58c55 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 24 Aug 2023 11:14:02 -0300 Subject: [PATCH 2/2] fix(aeh_producer): remove timestamp template field MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes https://emqx.atlassian.net/browse/EMQX-10847 Checking the whole Kafka message from AEH, it seems like the timestamp type is append, which means that it’s the broker who controls the timestamp, and the timestamp defined by the producer is ignored. Ref: https://kafka.apache.org/documentation/#brokerconfigs_log.message.timestamp.type Example message consumed from AEH: ``` %{ "headers" => %{}, "key" => "", "offset" => 4, "topic" => "test0", "ts" => 1692879703006, "ts_type" => "append", "value" => "{\"username\":\"undefined\",\"topic\":\"t/aeh/produ\",\"timestamp\":1692879692189,\"qos\":0,\"publish_received_at\":1692879692189,\"pub_props\":{\"User-Property\":{}},\"peerhost\":\"undefined\",\"payload\":\"aaaa\",\"node\":\"emqx@127.0.0.1\",\"metadata\":{\"rule_id\":\"rule_aehp\"},\"id\":\"000603AA44B34E08F4AF000006E30003\",\"flags\":{},\"event\":\"message.publish\",\"clientid\":\"undefined\"}" } ``` Note the ts_type above is append. Example message from a Kafka broker whose ts type is create: ``` %{ "headers" => %{}, "key" => "", "offset" => 4, "topic" => "test-topic-three-partitions", "ts" => 1692881883668, "ts_type" => "create", "value" => "{\"username\":\"undefined\",\"topic\":\"t/kafka/produ\",\"timestamp\":1692881883668,\"qos\":0,\"publish_received_at\":1692881883668,\"pub_props\":{\"User-Property\":{}},\"peerhost\":\"undefined\",\"payload\":\"aaaaaa\",\"node\":\"emqx@127.0.0.1\",\"id\":\"000603AAC7529FEEF4AC000007050000\",\"flags\":{},\"event\":\"message.publish\",\"clientid\":\"undefined\"}" } ``` Unfortunately, I couldn’t find anywhere in AEH where that configuration could be changed. --- .../src/emqx_bridge_azure_event_hub.app.src | 2 +- .../src/emqx_bridge_azure_event_hub.erl | 6 ++++++ .../test/emqx_bridge_azure_event_hub_producer_SUITE.erl | 5 ----- .../test/emqx_bridge_azure_event_hub_tests.erl | 1 - 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src index e29e9c83a..43033b657 100644 --- a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src +++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_azure_event_hub, [ {description, "EMQX Enterprise Azure Event Hub Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl index 2d6343b74..c563a35d8 100644 --- a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl +++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl @@ -64,6 +64,10 @@ fields(producer_kafka_opts) -> kafka_producer_overrides() ), override_documentations(Fields); +fields(kafka_message) -> + Fields0 = emqx_bridge_kafka:fields(kafka_message), + Fields = proplists:delete(timestamp, Fields0), + override_documentations(Fields); fields(Method) -> Fields = emqx_bridge_kafka:fields(Method), override_documentations(Fields). @@ -85,6 +89,7 @@ desc(Name) -> struct_names() -> [ auth_username_password, + kafka_message, producer_kafka_opts ]. @@ -245,6 +250,7 @@ kafka_producer_overrides() -> default => no_compression, importance => ?IMPORTANCE_HIDDEN }), + message => mk(ref(kafka_message), #{}), required_acks => mk(enum([all_isr, leader_only]), #{default => all_isr}) }. diff --git a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_producer_SUITE.erl b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_producer_SUITE.erl index af4b87718..e77d724d0 100644 --- a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_producer_SUITE.erl +++ b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_producer_SUITE.erl @@ -65,10 +65,6 @@ init_per_suite(Config) -> end. end_per_suite(Config) -> - %% emqx_mgmt_api_test_util:end_suite(), - %% ok = emqx_common_test_helpers:stop_apps([emqx_conf]), - %% ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_rule_engine]), - %% _ = application:stop(emqx_connector), Apps = ?config(tc_apps, Config), emqx_cth_suite:stop(Apps), ok. @@ -145,7 +141,6 @@ bridge_config(TestCase, Config) -> <<"message">> => #{ <<"key">> => <<"${.clientid}">>, - <<"timestamp">> => <<"${.timestamp}">>, <<"value">> => <<"${.}">> }, <<"partition_count_refresh_interval">> => <<"60s">>, diff --git a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_tests.erl b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_tests.erl index d624421c6..92d268d20 100644 --- a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_tests.erl +++ b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_tests.erl @@ -33,7 +33,6 @@ bridges.azure_event_hub_producer.my_producer { max_inflight = 10 message { key = \"${.clientid}\" - timestamp = \"${.timestamp}\" value = \"${.}\" } partition_count_refresh_interval = 60s