diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src index e29e9c83a..43033b657 100644 --- a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src +++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_azure_event_hub, [ {description, "EMQX Enterprise Azure Event Hub Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl index 2d6343b74..c563a35d8 100644 --- a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl +++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl @@ -64,6 +64,10 @@ fields(producer_kafka_opts) -> kafka_producer_overrides() ), override_documentations(Fields); +fields(kafka_message) -> + Fields0 = emqx_bridge_kafka:fields(kafka_message), + Fields = proplists:delete(timestamp, Fields0), + override_documentations(Fields); fields(Method) -> Fields = emqx_bridge_kafka:fields(Method), override_documentations(Fields). @@ -85,6 +89,7 @@ desc(Name) -> struct_names() -> [ auth_username_password, + kafka_message, producer_kafka_opts ]. @@ -245,6 +250,7 @@ kafka_producer_overrides() -> default => no_compression, importance => ?IMPORTANCE_HIDDEN }), + message => mk(ref(kafka_message), #{}), required_acks => mk(enum([all_isr, leader_only]), #{default => all_isr}) }. diff --git a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_producer_SUITE.erl b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_producer_SUITE.erl index af4b87718..e77d724d0 100644 --- a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_producer_SUITE.erl +++ b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_producer_SUITE.erl @@ -65,10 +65,6 @@ init_per_suite(Config) -> end. end_per_suite(Config) -> - %% emqx_mgmt_api_test_util:end_suite(), - %% ok = emqx_common_test_helpers:stop_apps([emqx_conf]), - %% ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_rule_engine]), - %% _ = application:stop(emqx_connector), Apps = ?config(tc_apps, Config), emqx_cth_suite:stop(Apps), ok. @@ -145,7 +141,6 @@ bridge_config(TestCase, Config) -> <<"message">> => #{ <<"key">> => <<"${.clientid}">>, - <<"timestamp">> => <<"${.timestamp}">>, <<"value">> => <<"${.}">> }, <<"partition_count_refresh_interval">> => <<"60s">>, diff --git a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_tests.erl b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_tests.erl index d624421c6..92d268d20 100644 --- a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_tests.erl +++ b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_tests.erl @@ -33,7 +33,6 @@ bridges.azure_event_hub_producer.my_producer { max_inflight = 10 message { key = \"${.clientid}\" - timestamp = \"${.timestamp}\" value = \"${.}\" } partition_count_refresh_interval = 60s diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index ea6666ea0..d8c934e6b 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -286,7 +286,7 @@ on_query_async( compile_message_template(T) -> KeyTemplate = maps:get(key, T, <<"${.clientid}">>), ValueTemplate = maps:get(value, T, <<"${.}">>), - TimestampTemplate = maps:get(value, T, <<"${.timestamp}">>), + TimestampTemplate = maps:get(timestamp, T, <<"${.timestamp}">>), #{ key => preproc_tmpl(KeyTemplate), value => preproc_tmpl(ValueTemplate), diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index d93b6dd7d..b920b39ae 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -470,7 +470,51 @@ t_failed_creation_then_fix(Config) -> delete_all_bridges(), ok. -t_table_removed(_Config) -> +t_custom_timestamp(_Config) -> + HostsString = kafka_hosts_string_sasl(), + AuthSettings = valid_sasl_plain_settings(), + Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]), + Type = ?BRIDGE_TYPE, + Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), + ResourceId = emqx_bridge_resource:resource_id(Type, Name), + KafkaTopic = "test-topic-one-partition", + MQTTTopic = <<"t/local/kafka">>, + emqx:subscribe(MQTTTopic), + Conf0 = config(#{ + "authentication" => AuthSettings, + "kafka_hosts_string" => HostsString, + "local_topic" => MQTTTopic, + "kafka_topic" => KafkaTopic, + "instance_id" => ResourceId, + "ssl" => #{} + }), + Conf = emqx_utils_maps:deep_put( + [<<"kafka">>, <<"message">>, <<"timestamp">>], + Conf0, + <<"123">> + ), + {ok, _} = emqx_bridge:create(Type, erlang:list_to_atom(Name), Conf), + {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0), + ct:pal("base offset before testing ~p", [Offset]), + Time = erlang:unique_integer(), + BinTime = integer_to_binary(Time), + Msg = #{ + clientid => BinTime, + payload => <<"payload">>, + timestamp => Time + }, + emqx:publish(emqx_message:make(MQTTTopic, emqx_utils_json:encode(Msg))), + {ok, {_, [KafkaMsg]}} = + ?retry( + _Interval = 500, + _NAttempts = 20, + {ok, {_, [_]}} = brod:fetch(kafka_hosts(), KafkaTopic, _Partition = 0, Offset) + ), + ?assertMatch(#kafka_message{ts = 123, ts_type = create}, KafkaMsg), + delete_all_bridges(), + ok. + +t_nonexistent_topic(_Config) -> HostsString = kafka_hosts_string_sasl(), AuthSettings = valid_sasl_plain_settings(), Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]), diff --git a/changes/ee/fix-11513.en.md b/changes/ee/fix-11513.en.md new file mode 100644 index 000000000..51d953933 --- /dev/null +++ b/changes/ee/fix-11513.en.md @@ -0,0 +1 @@ +Fixed a bug which prevented the Kafka Producer bridge from using the correct template for the `timestamp` field.