fix(aeh_producer): remove timestamp template field

Fixes https://emqx.atlassian.net/browse/EMQX-10847

Checking the whole Kafka message from AEH, it seems like the timestamp type is append,
which means that it’s the broker who controls the timestamp, and the timestamp defined by
the producer is ignored.

Ref: https://kafka.apache.org/documentation/#brokerconfigs_log.message.timestamp.type

Example message consumed from AEH:
```
%{
  "headers" => %{},
  "key" => "",
  "offset" => 4,
  "topic" => "test0",
  "ts" => 1692879703006,
  "ts_type" => "append",
  "value" => "{\"username\":\"undefined\",\"topic\":\"t/aeh/produ\",\"timestamp\":1692879692189,\"qos\":0,\"publish_received_at\":1692879692189,\"pub_props\":{\"User-Property\":{}},\"peerhost\":\"undefined\",\"payload\":\"aaaa\",\"node\":\"emqx@127.0.0.1\",\"metadata\":{\"rule_id\":\"rule_aehp\"},\"id\":\"000603AA44B34E08F4AF000006E30003\",\"flags\":{},\"event\":\"message.publish\",\"clientid\":\"undefined\"}"
}
```

Note the ts_type above is append.

Example message from a Kafka broker whose ts type is create:
```
%{
  "headers" => %{},
  "key" => "",
  "offset" => 4,
  "topic" => "test-topic-three-partitions",
  "ts" => 1692881883668,
  "ts_type" => "create",
  "value" => "{\"username\":\"undefined\",\"topic\":\"t/kafka/produ\",\"timestamp\":1692881883668,\"qos\":0,\"publish_received_at\":1692881883668,\"pub_props\":{\"User-Property\":{}},\"peerhost\":\"undefined\",\"payload\":\"aaaaaa\",\"node\":\"emqx@127.0.0.1\",\"id\":\"000603AAC7529FEEF4AC000007050000\",\"flags\":{},\"event\":\"message.publish\",\"clientid\":\"undefined\"}"
}
```

Unfortunately, I couldn’t find anywhere in AEH where that configuration could be changed.
This commit is contained in:
Thales Macedo Garitezi 2023-08-24 11:14:02 -03:00
parent 0381ac0410
commit 016ae0524f
4 changed files with 7 additions and 7 deletions

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_azure_event_hub, [
{description, "EMQX Enterprise Azure Event Hub Bridge"},
{vsn, "0.1.1"},
{vsn, "0.1.2"},
{registered, []},
{applications, [
kernel,

View File

@ -64,6 +64,10 @@ fields(producer_kafka_opts) ->
kafka_producer_overrides()
),
override_documentations(Fields);
fields(kafka_message) ->
Fields0 = emqx_bridge_kafka:fields(kafka_message),
Fields = proplists:delete(timestamp, Fields0),
override_documentations(Fields);
fields(Method) ->
Fields = emqx_bridge_kafka:fields(Method),
override_documentations(Fields).
@ -85,6 +89,7 @@ desc(Name) ->
struct_names() ->
[
auth_username_password,
kafka_message,
producer_kafka_opts
].
@ -245,6 +250,7 @@ kafka_producer_overrides() ->
default => no_compression,
importance => ?IMPORTANCE_HIDDEN
}),
message => mk(ref(kafka_message), #{}),
required_acks => mk(enum([all_isr, leader_only]), #{default => all_isr})
}.

View File

@ -65,10 +65,6 @@ init_per_suite(Config) ->
end.
end_per_suite(Config) ->
%% emqx_mgmt_api_test_util:end_suite(),
%% ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
%% ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_rule_engine]),
%% _ = application:stop(emqx_connector),
Apps = ?config(tc_apps, Config),
emqx_cth_suite:stop(Apps),
ok.
@ -145,7 +141,6 @@ bridge_config(TestCase, Config) ->
<<"message">> =>
#{
<<"key">> => <<"${.clientid}">>,
<<"timestamp">> => <<"${.timestamp}">>,
<<"value">> => <<"${.}">>
},
<<"partition_count_refresh_interval">> => <<"60s">>,

View File

@ -33,7 +33,6 @@ bridges.azure_event_hub_producer.my_producer {
max_inflight = 10
message {
key = \"${.clientid}\"
timestamp = \"${.timestamp}\"
value = \"${.}\"
}
partition_count_refresh_interval = 60s