From 6023012f8b4ee39e2491c2423c8f260063c92628 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 24 Jun 2024 10:02:26 -0300 Subject: [PATCH 1/7] fix(kafka and derivatives): add `alias` config to avoid clashes with same topic Fixes https://emqx.atlassian.net/browse/EMQX-12592 --- .../test/emqx_bridge_v2_testlib.erl | 39 +++++++- apps/emqx_bridge_azure_event_hub/rebar.config | 2 +- .../emqx_bridge_azure_event_hub_v2_SUITE.erl | 45 ++++++--- apps/emqx_bridge_confluent/rebar.config | 2 +- .../emqx_bridge_confluent_producer_SUITE.erl | 24 ++++- apps/emqx_bridge_kafka/rebar.config | 2 +- .../src/emqx_bridge_kafka_impl_producer.erl | 11 +++ .../emqx_bridge_v2_kafka_producer_SUITE.erl | 93 ++++++++++++++++++- changes/ee/breaking-13327.en.md | 3 + changes/ee/fix-13327.en.md | 1 + mix.exs | 2 +- 11 files changed, 202 insertions(+), 22 deletions(-) create mode 100644 changes/ee/breaking-13327.en.md create mode 100644 changes/ee/fix-13327.en.md diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl index 8cf8730b0..41c184af9 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl @@ -288,6 +288,14 @@ request(Method, Path, Params) -> Error end. +simplify_result(Res) -> + case Res of + {error, {{_, Status, _}, _, Body}} -> + {Status, Body}; + {ok, {{_, Status, _}, _, Body}} -> + {Status, Body} + end. + list_bridges_api() -> Params = [], Path = emqx_mgmt_api_test_util:api_path(["actions"]), @@ -321,7 +329,7 @@ get_bridge_api(BridgeKind, BridgeType, BridgeName) -> Path = emqx_mgmt_api_test_util:api_path([Root, BridgeId]), ct:pal("get bridge ~p (via http)", [{BridgeKind, BridgeType, BridgeName}]), Res = request(get, Path, Params), - ct:pal("get bridge ~p result: ~p", [{BridgeKind, BridgeType, BridgeName}, Res]), + ct:pal("get bridge ~p result:\n ~p", [{BridgeKind, BridgeType, BridgeName}, Res]), Res. create_bridge_api(Config) -> @@ -349,6 +357,26 @@ create_kind_api(Config, Overrides) -> ct:pal("bridge create (~s, http) result:\n ~p", [Kind, Res]), Res. +enable_kind_api(Kind, ConnectorType, ConnectorName) -> + do_enable_disable_kind_api(Kind, ConnectorType, ConnectorName, enable). + +disable_kind_api(Kind, ConnectorType, ConnectorName) -> + do_enable_disable_kind_api(Kind, ConnectorType, ConnectorName, disable). + +do_enable_disable_kind_api(Kind, Type, Name, Op) -> + BridgeId = emqx_bridge_resource:bridge_id(Type, Name), + RootBin = api_path_root(Kind), + {OpPath, OpStr} = + case Op of + enable -> {"true", "enable"}; + disable -> {"false", "disable"} + end, + Path = emqx_mgmt_api_test_util:api_path([RootBin, BridgeId, "enable", OpPath]), + ct:pal(OpStr ++ " ~s ~s (http)", [Kind, BridgeId]), + Res = request(put, Path, []), + ct:pal(OpStr ++ " ~s ~s (http) result:\n ~p", [Kind, BridgeId, Res]), + simplify_result(Res). + create_connector_api(Config) -> create_connector_api(Config, _Overrides = #{}). @@ -453,6 +481,15 @@ update_bridge_api(Config, Overrides) -> ct:pal("update bridge (~s, http) result:\n ~p", [Kind, Res]), Res. +delete_kind_api(Kind, Type, Name) -> + BridgeId = emqx_bridge_resource:bridge_id(Type, Name), + PathRoot = api_path_root(Kind), + Path = emqx_mgmt_api_test_util:api_path([PathRoot, BridgeId]), + ct:pal("deleting bridge (~s, http)", [Kind]), + Res = request(delete, Path, _Params = []), + ct:pal("delete bridge (~s, http) result:\n ~p", [Kind, Res]), + simplify_result(Res). + op_bridge_api(Op, BridgeType, BridgeName) -> op_bridge_api(_Kind = action, Op, BridgeType, BridgeName). diff --git a/apps/emqx_bridge_azure_event_hub/rebar.config b/apps/emqx_bridge_azure_event_hub/rebar.config index a0cc8def3..63f944a5c 100644 --- a/apps/emqx_bridge_azure_event_hub/rebar.config +++ b/apps/emqx_bridge_azure_event_hub/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.5"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.6"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, diff --git a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl index bc09a5509..661b8819c 100644 --- a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl +++ b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl @@ -40,6 +40,8 @@ init_per_suite(Config) -> emqx, emqx_management, emqx_resource, + %% Just for test helpers + brod, emqx_bridge_azure_event_hub, emqx_bridge, emqx_rule_engine, @@ -93,6 +95,9 @@ common_init_per_testcase(TestCase, Config) -> {connector_type, ?CONNECTOR_TYPE}, {connector_name, Name}, {connector_config, ConnectorConfig}, + {action_type, ?BRIDGE_TYPE}, + {action_name, Name}, + {action_config, BridgeConfig}, {bridge_type, ?BRIDGE_TYPE}, {bridge_name, Name}, {bridge_config, BridgeConfig} @@ -100,18 +105,13 @@ common_init_per_testcase(TestCase, Config) -> ]. end_per_testcase(_Testcase, Config) -> - case proplists:get_bool(skip_does_not_apply, Config) of - true -> - ok; - false -> - ProxyHost = ?config(proxy_host, Config), - ProxyPort = ?config(proxy_port, Config), - emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), - emqx_common_test_helpers:call_janitor(60_000), - ok = snabbkaffe:stop(), - ok - end. + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), + emqx_common_test_helpers:call_janitor(60_000), + ok = snabbkaffe:stop(), + ok. %%------------------------------------------------------------------------------ %% Helper fns @@ -172,7 +172,7 @@ bridge_config(Name, ConnectorId, KafkaTopic) -> #{ <<"enable">> => true, <<"connector">> => ConnectorId, - <<"kafka">> => + <<"parameters">> => #{ <<"buffer">> => #{ @@ -322,7 +322,7 @@ t_same_name_azure_kafka_bridges(Config) -> ), %% then create a Kafka bridge with same name and delete it after creation - ConfigKafka0 = lists:keyreplace(bridge_type, 1, Config, {bridge_type, ?KAFKA_BRIDGE_TYPE}), + ConfigKafka0 = lists:keyreplace(action_type, 1, Config, {action_type, ?KAFKA_BRIDGE_TYPE}), ConfigKafka = lists:keyreplace( connector_type, 1, ConfigKafka0, {connector_type, ?KAFKA_BRIDGE_TYPE} ), @@ -374,3 +374,20 @@ t_http_api_get(Config) -> emqx_bridge_testlib:list_bridges_api() ), ok. + +t_multiple_actions_sharing_topic(Config) -> + ActionConfig0 = ?config(action_config, Config), + ActionConfig = + emqx_utils_maps:deep_merge( + ActionConfig0, + #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} + ), + ok = emqx_bridge_v2_kafka_producer_SUITE:t_multiple_actions_sharing_topic( + [ + {type, ?BRIDGE_TYPE_BIN}, + {connector_name, ?config(connector_name, Config)}, + {connector_config, ?config(connector_config, Config)}, + {action_config, ActionConfig} + ] + ), + ok. diff --git a/apps/emqx_bridge_confluent/rebar.config b/apps/emqx_bridge_confluent/rebar.config index f518c8d4f..c6df829f9 100644 --- a/apps/emqx_bridge_confluent/rebar.config +++ b/apps/emqx_bridge_confluent/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.5"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.6"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, diff --git a/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl b/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl index 724365c58..0b3a22a99 100644 --- a/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl +++ b/apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl @@ -40,6 +40,8 @@ init_per_suite(Config) -> emqx, emqx_management, emqx_resource, + %% Just for test helpers + brod, emqx_bridge_confluent, emqx_bridge, emqx_rule_engine, @@ -93,6 +95,9 @@ common_init_per_testcase(TestCase, Config) -> {connector_type, ?CONNECTOR_TYPE}, {connector_name, Name}, {connector_config, ConnectorConfig}, + {action_type, ?ACTION_TYPE}, + {action_name, Name}, + {action_config, BridgeConfig}, {bridge_type, ?ACTION_TYPE}, {bridge_name, Name}, {bridge_config, BridgeConfig} @@ -306,7 +311,7 @@ t_same_name_confluent_kafka_bridges(Config) -> ), %% then create a Kafka bridge with same name and delete it after creation - ConfigKafka0 = lists:keyreplace(bridge_type, 1, Config, {bridge_type, ?KAFKA_BRIDGE_TYPE}), + ConfigKafka0 = lists:keyreplace(action_type, 1, Config, {action_type, ?KAFKA_BRIDGE_TYPE}), ConfigKafka = lists:keyreplace( connector_type, 1, ConfigKafka0, {connector_type, ?KAFKA_BRIDGE_TYPE} ), @@ -378,3 +383,20 @@ t_list_v1_bridges(Config) -> [] ), ok. + +t_multiple_actions_sharing_topic(Config) -> + ActionConfig0 = ?config(action_config, Config), + ActionConfig = + emqx_utils_maps:deep_merge( + ActionConfig0, + #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} + ), + ok = emqx_bridge_v2_kafka_producer_SUITE:t_multiple_actions_sharing_topic( + [ + {type, ?ACTION_TYPE_BIN}, + {connector_name, ?config(connector_name, Config)}, + {connector_config, ?config(connector_config, Config)}, + {action_config, ActionConfig} + ] + ), + ok. diff --git a/apps/emqx_bridge_kafka/rebar.config b/apps/emqx_bridge_kafka/rebar.config index 500c5a394..5e394a261 100644 --- a/apps/emqx_bridge_kafka/rebar.config +++ b/apps/emqx_bridge_kafka/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.5"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.6"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 872e0027a..6b85cbbe4 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -324,6 +324,12 @@ on_query( }), do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) catch + error:{invalid_partition_count, _Count, _Partitioner} -> + ?tp("kafka_producer_invalid_partition_count", #{ + action_id => MessageTag, + query_mode => sync + }), + {error, invalid_partition_count}; throw:{bad_kafka_header, _} = Error -> ?tp( emqx_bridge_kafka_impl_producer_sync_query_failed, @@ -385,6 +391,10 @@ on_query_async( do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) catch error:{invalid_partition_count, _Count, _Partitioner} -> + ?tp("kafka_producer_invalid_partition_count", #{ + action_id => MessageTag, + query_mode => async + }), {error, invalid_partition_count}; throw:{bad_kafka_header, _} = Error -> ?tp( @@ -690,6 +700,7 @@ producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) -> max_batch_bytes => MaxBatchBytes, max_send_ahead => MaxInflight - 1, compression => Compression, + alias => BridgeV2Id, telemetry_meta_data => #{bridge_id => BridgeV2Id}, max_partitions => MaxPartitions }. diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index afd6e05a5..ba558792b 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -142,6 +142,9 @@ check_send_message_with_bridge(BridgeName) -> check_kafka_message_payload(Offset, Payload). send_message(ActionName) -> + send_message(?TYPE, ActionName). + +send_message(Type, ActionName) -> %% ###################################### %% Create Kafka message %% ###################################### @@ -157,8 +160,8 @@ send_message(ActionName) -> %% ###################################### %% Send message %% ###################################### - emqx_bridge_v2:send_message(?TYPE, ActionName, Msg, #{}), - #{offset => Offset, payload => Payload}. + Res = emqx_bridge_v2:send_message(Type, ActionName, Msg, #{}), + #{offset => Offset, payload => Payload, result => Res}. resolve_kafka_offset() -> KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(), @@ -285,6 +288,18 @@ action_api_spec_props_for_get() -> emqx_bridge_v2_testlib:actions_api_spec_schemas(), Props. +assert_status_api(Line, Type, Name, Status) -> + ?assertMatch( + {ok, + {{_, 200, _}, _, #{ + <<"status">> := Status, + <<"node_status">> := [#{<<"status">> := Status}] + }}}, + emqx_bridge_v2_testlib:get_bridge_api(Type, Name), + #{line => Line, name => Name, expected_status => Status} + ). +-define(assertStatusAPI(TYPE, NAME, STATUS), assert_status_api(?LINE, TYPE, NAME, STATUS)). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -662,3 +677,77 @@ t_ancient_v1_config_migration_without_local_topic(Config) -> erpc:call(Node, fun emqx_bridge_v2:list/0) ), ok. + +%% Tests that deleting/disabling an action that share the same Kafka topic with other +%% actions do not disturb the latter. +t_multiple_actions_sharing_topic(Config) -> + Type = proplists:get_value(type, Config, ?TYPE), + ConnectorName = proplists:get_value(connector_name, Config, <<"c">>), + ConnectorConfig = proplists:get_value(connector_config, Config, connector_config()), + ActionConfig = proplists:get_value(action_config, Config, action_config(ConnectorName)), + ?check_trace( + begin + ConnectorParams = [ + {connector_config, ConnectorConfig}, + {connector_name, ConnectorName}, + {connector_type, Type} + ], + ActionName1 = <<"a1">>, + ActionParams1 = [ + {action_config, ActionConfig}, + {action_name, ActionName1}, + {action_type, Type} + ], + ActionName2 = <<"a2">>, + ActionParams2 = [ + {action_config, ActionConfig}, + {action_name, ActionName2}, + {action_type, Type} + ], + {ok, {{_, 201, _}, _, #{}}} = + emqx_bridge_v2_testlib:create_connector_api(ConnectorParams), + {ok, {{_, 201, _}, _, #{}}} = + emqx_bridge_v2_testlib:create_action_api(ActionParams1), + {ok, {{_, 201, _}, _, #{}}} = + emqx_bridge_v2_testlib:create_action_api(ActionParams2), + RuleTopic = <<"t/a2">>, + {ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http(Type, RuleTopic, Config), + + ?assertStatusAPI(Type, ActionName1, <<"connected">>), + ?assertStatusAPI(Type, ActionName2, <<"connected">>), + + %% Disabling a1 shouldn't disturb a2. + ?assertMatch( + {204, _}, emqx_bridge_v2_testlib:disable_kind_api(action, Type, ActionName1) + ), + + ?assertStatusAPI(Type, ActionName1, <<"disconnected">>), + ?assertStatusAPI(Type, ActionName2, <<"connected">>), + + ?assertMatch(#{result := ok}, send_message(Type, ActionName2)), + ?assertStatusAPI(Type, ActionName2, <<"connected">>), + + ?assertMatch( + {204, _}, + emqx_bridge_v2_testlib:enable_kind_api(action, Type, ActionName1) + ), + ?assertStatusAPI(Type, ActionName1, <<"connected">>), + ?assertStatusAPI(Type, ActionName2, <<"connected">>), + ?assertMatch(#{result := ok}, send_message(Type, ActionName2)), + + %% Deleting also shouldn't disrupt a2. + ?assertMatch( + {204, _}, + emqx_bridge_v2_testlib:delete_kind_api(action, Type, ActionName1) + ), + ?assertStatusAPI(Type, ActionName2, <<"connected">>), + ?assertMatch(#{result := ok}, send_message(Type, ActionName2)), + + ok + end, + fun(Trace) -> + ?assertEqual([], ?of_kind("kafka_producer_invalid_partition_count", Trace)), + ok + end + ), + ok. diff --git a/changes/ee/breaking-13327.en.md b/changes/ee/breaking-13327.en.md new file mode 100644 index 000000000..645ab0c73 --- /dev/null +++ b/changes/ee/breaking-13327.en.md @@ -0,0 +1,3 @@ +The directory path scheme for on-disk Kafka/Confluent/Azure Event Hub buffers has changed. It now uses the Action name instead of the topic name. + +Updating to this version will invalidate (not use) old paths, and will require manual cleanup of the old directories. diff --git a/changes/ee/fix-13327.en.md b/changes/ee/fix-13327.en.md new file mode 100644 index 000000000..e81490c5c --- /dev/null +++ b/changes/ee/fix-13327.en.md @@ -0,0 +1 @@ +Fixed an issue with Kafka, Confluent and Azure Event Hub bridges where different actions targeting the same topic could break one another when being deleted or disabled. diff --git a/mix.exs b/mix.exs index a94707da2..68a79c73e 100644 --- a/mix.exs +++ b/mix.exs @@ -211,7 +211,7 @@ defmodule EMQXUmbrella.MixProject do {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1+ezstd-v1.0.5-emqx1"}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true}, - {:wolff, github: "kafka4beam/wolff", tag: "1.10.5"}, + {:wolff, github: "kafka4beam/wolff", tag: "1.10.6"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod, github: "kafka4beam/brod", tag: "3.18.0"}, From ed5e6599d92d7c85d89ddd076864b5bb2545ce4b Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 24 Jun 2024 17:01:29 -0300 Subject: [PATCH 2/7] fix(buffer worker, kafka): send reply when async call fails immediately Fixes https://emqx.atlassian.net/browse/EMQX-12585 --- .../src/emqx_bridge_kafka_impl_producer.erl | 4 +- .../emqx_bridge_v2_kafka_producer_SUITE.erl | 130 ++++++++++++++++++ .../src/emqx_resource_buffer_worker.erl | 36 +++-- .../src/emqx_rule_runtime.erl | 2 + .../test/emqx_rule_engine_SUITE.erl | 6 +- 5 files changed, 164 insertions(+), 14 deletions(-) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 6b85cbbe4..abdabef8c 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -329,7 +329,7 @@ on_query( action_id => MessageTag, query_mode => sync }), - {error, invalid_partition_count}; + {error, {unrecoverable_error, invalid_partition_count}}; throw:{bad_kafka_header, _} = Error -> ?tp( emqx_bridge_kafka_impl_producer_sync_query_failed, @@ -395,7 +395,7 @@ on_query_async( action_id => MessageTag, query_mode => async }), - {error, invalid_partition_count}; + {error, {unrecoverable_error, invalid_partition_count}}; throw:{bad_kafka_header, _} = Error -> ?tp( emqx_bridge_kafka_impl_producer_async_query_failed, diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index ba558792b..a7918610e 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -300,6 +300,9 @@ assert_status_api(Line, Type, Name, Status) -> ). -define(assertStatusAPI(TYPE, NAME, STATUS), assert_status_api(?LINE, TYPE, NAME, STATUS)). +get_rule_metrics(RuleId) -> + emqx_metrics_worker:get_metrics(rule_metrics, RuleId). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -678,6 +681,133 @@ t_ancient_v1_config_migration_without_local_topic(Config) -> ), ok. +%% Checks that, if Kafka raises `invalid_partition_count' error, we bump the corresponding +%% failure rule action metric. +t_invalid_partition_count_metrics(Config) -> + Type = proplists:get_value(type, Config, ?TYPE), + ConnectorName = proplists:get_value(connector_name, Config, <<"c">>), + ConnectorConfig = proplists:get_value(connector_config, Config, connector_config()), + ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)), + ?check_trace( + #{timetrap => 10_000}, + begin + ConnectorParams = [ + {connector_config, ConnectorConfig}, + {connector_name, ConnectorName}, + {connector_type, Type} + ], + ActionName = <<"a">>, + ActionParams = [ + {action_config, ActionConfig1}, + {action_name, ActionName}, + {action_type, Type} + ], + {ok, {{_, 201, _}, _, #{}}} = + emqx_bridge_v2_testlib:create_connector_api(ConnectorParams), + + {ok, {{_, 201, _}, _, #{}}} = + emqx_bridge_v2_testlib:create_action_api(ActionParams), + RuleTopic = <<"t/a">>, + {ok, #{<<"id">> := RuleId}} = + emqx_bridge_v2_testlib:create_rule_and_action_http(Type, RuleTopic, [ + {bridge_name, ActionName} + ]), + + {ok, C} = emqtt:start_link([]), + {ok, _} = emqtt:connect(C), + + %%-------------------------------------------- + ?tp(notice, "sync", #{}), + %%-------------------------------------------- + %% Artificially force sync query to be used; otherwise, it's only used when the + %% resource is blocked and retrying. + ok = meck:new(emqx_bridge_kafka_impl_producer, [passthrough, no_history]), + on_exit(fun() -> catch meck:unload() end), + ok = meck:expect(emqx_bridge_kafka_impl_producer, query_mode, 1, simple_sync), + + %% Simulate `invalid_partition_count' + emqx_common_test_helpers:with_mock( + wolff, + send_sync, + fun(_Producers, _Msgs, _Timeout) -> + error({invalid_partition_count, 0, partitioner}) + end, + fun() -> + {{ok, _}, {ok, _}} = + ?wait_async_action( + emqtt:publish(C, RuleTopic, <<"hi">>, 2), + #{ + ?snk_kind := "kafka_producer_invalid_partition_count", + query_mode := sync + } + ), + ?assertMatch( + #{ + counters := #{ + 'actions.total' := 1, + 'actions.failed' := 1 + } + }, + get_rule_metrics(RuleId) + ), + ok + end + ), + + %%-------------------------------------------- + %% Same thing, but async call + ?tp(notice, "async", #{}), + %%-------------------------------------------- + ok = meck:expect( + emqx_bridge_kafka_impl_producer, + query_mode, + fun(Conf) -> meck:passthrough([Conf]) end + ), + ok = emqx_bridge_v2:remove(actions, Type, ActionName), + {ok, {{_, 201, _}, _, #{}}} = + emqx_bridge_v2_testlib:create_action_api( + ActionParams, + #{<<"parameters">> => #{<<"query_mode">> => <<"async">>}} + ), + + %% Simulate `invalid_partition_count' + emqx_common_test_helpers:with_mock( + wolff, + send, + fun(_Producers, _Msgs, _Timeout) -> + error({invalid_partition_count, 0, partitioner}) + end, + fun() -> + {{ok, _}, {ok, _}} = + ?wait_async_action( + emqtt:publish(C, RuleTopic, <<"hi">>, 2), + #{?snk_kind := "rule_engine_applied_all_rules"} + ), + ?assertMatch( + #{ + counters := #{ + 'actions.total' := 2, + 'actions.failed' := 2 + } + }, + get_rule_metrics(RuleId) + ), + ok + end + ), + + ok + end, + fun(Trace) -> + ?assertMatch( + [#{query_mode := sync}, #{query_mode := async} | _], + ?of_kind("kafka_producer_invalid_partition_count", Trace) + ), + ok + end + ), + ok. + %% Tests that deleting/disabling an action that share the same Kafka topic with other %% actions do not disturb the latter. t_multiple_actions_sharing_topic(Config) -> diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 5f269c112..05d42ed1a 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1401,16 +1401,26 @@ apply_query_fun( query_opts => QueryOpts, min_query => minimize(Query) }, + IsSimpleQuery = maps:get(simple_query, QueryOpts, false), IsRetriable = false, AsyncWorkerMRef = undefined, InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, AsyncWorkerMRef), ok = inflight_append(InflightTID, InflightItem), case pre_query_channel_check(Request, Channels, QueryOpts) of ok -> - Result = Mod:on_query_async( - extract_connector_id(Id), Request, {ReplyFun, [ReplyContext]}, ResSt - ), - {async_return, Result}; + case + Mod:on_query_async( + extract_connector_id(Id), Request, {ReplyFun, [ReplyContext]}, ResSt + ) + of + {error, _} = Error when IsSimpleQuery -> + %% If this callback returns error, we assume it won't reply + %% anything else and won't retry. + maybe_reply_to(Error, QueryOpts), + Error; + Result -> + {async_return, Result} + end; Error -> maybe_reply_to(Error, QueryOpts) end @@ -1480,16 +1490,26 @@ apply_query_fun( Requests = lists:map( fun(?QUERY(_ReplyTo, Request, _, _ExpireAt, _TraceCtx)) -> Request end, Batch ), + IsSimpleQuery = maps:get(simple_query, QueryOpts, false), IsRetriable = false, AsyncWorkerMRef = undefined, InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef), ok = inflight_append(InflightTID, InflightItem), case pre_query_channel_check(FirstRequest, Channels, QueryOpts) of ok -> - Result = Mod:on_batch_query_async( - extract_connector_id(Id), Requests, {ReplyFun, [ReplyContext]}, ResSt - ), - {async_return, Result}; + case + Mod:on_batch_query_async( + extract_connector_id(Id), Requests, {ReplyFun, [ReplyContext]}, ResSt + ) + of + {error, _} = Error when IsSimpleQuery -> + %% If this callback returns error, we assume it won't reply + %% anything else and won't retry. + maybe_reply_to(Error, QueryOpts), + Error; + Result -> + {async_return, Result} + end; Error -> maybe_reply_to(Error, QueryOpts) end diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 21a42c283..0d2b353b1 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -20,6 +20,7 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_trace.hrl"). -include_lib("emqx_resource/include/emqx_resource_errors.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). -export([ apply_rule/3, @@ -58,6 +59,7 @@ %%------------------------------------------------------------------------------ -spec apply_rules(list(rule()), columns(), envs()) -> ok. apply_rules([], _Columns, _Envs) -> + ?tp("rule_engine_applied_all_rules", #{}), ok; apply_rules([#{enable := false} | More], Columns, Envs) -> apply_rules(More, Columns, Envs); diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 680aac759..da1df58ea 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -216,10 +216,8 @@ init_per_group(metrics_fail_simple, Config) -> (_) -> simple_async end), meck:expect(?BRIDGE_IMPL, on_query, 3, {error, {unrecoverable_error, mecked_failure}}), - meck:expect(?BRIDGE_IMPL, on_query_async, fun(_, _, {ReplyFun, Args}, _) -> - Result = {error, {unrecoverable_error, mecked_failure}}, - erlang:apply(ReplyFun, Args ++ [Result]), - Result + meck:expect(?BRIDGE_IMPL, on_query_async, fun(_, _, {_ReplyFun, _Args}, _) -> + {error, {unrecoverable_error, mecked_failure}} end), [{mecked, [?BRIDGE_IMPL]} | Config]; init_per_group(_Groupname, Config) -> From 164a507899c85d43b45f8e64954ecf5b1b3dc733 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 24 Jun 2024 15:11:49 -0300 Subject: [PATCH 3/7] test(pulsar): add testcase for different producers using the same topic --- .../test/emqx_bridge_pulsar_v2_SUITE.erl | 251 ++++++++++++------ 1 file changed, 171 insertions(+), 80 deletions(-) diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl index 0636806de..11caa15c6 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl @@ -127,23 +127,18 @@ init_per_testcase(TestCase, Config) -> common_init_per_testcase(TestCase, Config). end_per_testcase(_Testcase, Config) -> - case proplists:get_bool(skip_does_not_apply, Config) of - true -> - ok; - false -> - ok = emqx_config:delete_override_conf_files(), - ProxyHost = ?config(proxy_host, Config), - ProxyPort = ?config(proxy_port, Config), - emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - emqx_bridge_v2_testlib:delete_all_bridges(), - stop_consumer(Config), - %% in CI, apparently this needs more time since the - %% machines struggle with all the containers running... - emqx_common_test_helpers:call_janitor(60_000), - ok = snabbkaffe:stop(), - flush_consumed(), - ok - end. + ok = emqx_config:delete_override_conf_files(), + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + emqx_bridge_v2_testlib:delete_all_bridges(), + stop_consumer(Config), + %% in CI, apparently this needs more time since the + %% machines struggle with all the containers running... + emqx_common_test_helpers:call_janitor(60_000), + ok = snabbkaffe:stop(), + flush_consumed(), + ok. common_init_per_testcase(TestCase, Config0) -> ct:timetrap(timer:seconds(60)), @@ -160,6 +155,10 @@ common_init_per_testcase(TestCase, Config0) -> ok = snabbkaffe:start_trace(), Config. +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + create_connector(Name, Config) -> Connector = pulsar_connector(Config), {ok, _} = emqx_connector:create(?TYPE, Name, Connector). @@ -174,69 +173,6 @@ create_action(Name, Config) -> delete_action(Name) -> ok = emqx_bridge_v2:remove(actions, ?TYPE, Name). -%%------------------------------------------------------------------------------ -%% Testcases -%%------------------------------------------------------------------------------ - -t_action_probe(Config) -> - Name = atom_to_binary(?FUNCTION_NAME), - Action = pulsar_action(Config), - {ok, Res0} = emqx_bridge_v2_testlib:probe_bridge_api(action, ?TYPE, Name, Action), - ?assertMatch({{_, 204, _}, _, _}, Res0), - ok. - -t_action(Config) -> - Name = atom_to_binary(?FUNCTION_NAME), - create_action(Name, Config), - Actions = emqx_bridge_v2:list(actions), - Any = fun(#{name := BName}) -> BName =:= Name end, - ?assert(lists:any(Any, Actions), Actions), - Topic = <<"lkadfdaction">>, - {ok, #{id := RuleId}} = emqx_rule_engine:create_rule( - #{ - sql => <<"select * from \"", Topic/binary, "\"">>, - id => atom_to_binary(?FUNCTION_NAME), - actions => [<<"pulsar:", Name/binary>>], - description => <<"bridge_v2 send msg to pulsar action">> - } - ), - on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end), - MQTTClientID = <<"pulsar_mqtt_clientid">>, - {ok, C1} = emqtt:start_link([{clean_start, true}, {clientid, MQTTClientID}]), - {ok, _} = emqtt:connect(C1), - ReqPayload = payload(), - ReqPayloadBin = emqx_utils_json:encode(ReqPayload), - {ok, _} = emqtt:publish(C1, Topic, #{}, ReqPayloadBin, [{qos, 1}, {retain, false}]), - [#{<<"clientid">> := ClientID, <<"payload">> := RespPayload}] = receive_consumed(5000), - ?assertEqual(MQTTClientID, ClientID), - ?assertEqual(ReqPayload, emqx_utils_json:decode(RespPayload)), - ok = emqtt:disconnect(C1), - InstanceId = instance_id(actions, Name), - ?retry( - 100, - 20, - ?assertMatch( - #{ - counters := #{ - dropped := 0, - success := 1, - matched := 1, - failed := 0, - received := 0 - } - }, - emqx_resource:get_metrics(InstanceId) - ) - ), - ok = delete_action(Name), - ActionsAfterDelete = emqx_bridge_v2:list(actions), - ?assertNot(lists:any(Any, ActionsAfterDelete), ActionsAfterDelete), - ok. - -%%------------------------------------------------------------------------------ -%% Helper fns -%%------------------------------------------------------------------------------ - pulsar_connector(Config) -> PulsarHost = ?config(pulsar_host, Config), PulsarPort = ?config(pulsar_port, Config), @@ -455,3 +391,158 @@ maybe_skip_without_ci() -> _ -> {skip, no_pulsar} end. + +assert_status_api(Line, Type, Name, Status) -> + ?assertMatch( + {ok, + {{_, 200, _}, _, #{ + <<"status">> := Status, + <<"node_status">> := [#{<<"status">> := Status}] + }}}, + emqx_bridge_v2_testlib:get_bridge_api(Type, Name), + #{line => Line, name => Name, expected_status => Status} + ). +-define(assertStatusAPI(TYPE, NAME, STATUS), assert_status_api(?LINE, TYPE, NAME, STATUS)). + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_action_probe(Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + Action = pulsar_action(Config), + {ok, Res0} = emqx_bridge_v2_testlib:probe_bridge_api(action, ?TYPE, Name, Action), + ?assertMatch({{_, 204, _}, _, _}, Res0), + ok. + +t_action(Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + create_action(Name, Config), + Actions = emqx_bridge_v2:list(actions), + Any = fun(#{name := BName}) -> BName =:= Name end, + ?assert(lists:any(Any, Actions), Actions), + Topic = <<"lkadfdaction">>, + {ok, #{id := RuleId}} = emqx_rule_engine:create_rule( + #{ + sql => <<"select * from \"", Topic/binary, "\"">>, + id => atom_to_binary(?FUNCTION_NAME), + actions => [<<"pulsar:", Name/binary>>], + description => <<"bridge_v2 send msg to pulsar action">> + } + ), + on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end), + MQTTClientID = <<"pulsar_mqtt_clientid">>, + {ok, C1} = emqtt:start_link([{clean_start, true}, {clientid, MQTTClientID}]), + {ok, _} = emqtt:connect(C1), + ReqPayload = payload(), + ReqPayloadBin = emqx_utils_json:encode(ReqPayload), + {ok, _} = emqtt:publish(C1, Topic, #{}, ReqPayloadBin, [{qos, 1}, {retain, false}]), + [#{<<"clientid">> := ClientID, <<"payload">> := RespPayload}] = receive_consumed(5000), + ?assertEqual(MQTTClientID, ClientID), + ?assertEqual(ReqPayload, emqx_utils_json:decode(RespPayload)), + ok = emqtt:disconnect(C1), + InstanceId = instance_id(actions, Name), + ?retry( + 100, + 20, + ?assertMatch( + #{ + counters := #{ + dropped := 0, + success := 1, + matched := 1, + failed := 0, + received := 0 + } + }, + emqx_resource:get_metrics(InstanceId) + ) + ), + ok = delete_action(Name), + ActionsAfterDelete = emqx_bridge_v2:list(actions), + ?assertNot(lists:any(Any, ActionsAfterDelete), ActionsAfterDelete), + ok. + +%% Tests that deleting/disabling an action that share the same Pulsar topic with other +%% actions do not disturb the latter. +t_multiple_actions_sharing_topic(Config) -> + Type = ?TYPE, + ConnectorName = <<"c">>, + ConnectorConfig = pulsar_connector(Config), + ActionConfig = pulsar_action(Config), + ?check_trace( + begin + ConnectorParams = [ + {connector_config, ConnectorConfig}, + {connector_name, ConnectorName}, + {connector_type, Type} + ], + ActionName1 = <<"a1">>, + ActionParams1 = [ + {action_config, ActionConfig}, + {action_name, ActionName1}, + {action_type, Type} + ], + ActionName2 = <<"a2">>, + ActionParams2 = [ + {action_config, ActionConfig}, + {action_name, ActionName2}, + {action_type, Type} + ], + {ok, {{_, 201, _}, _, #{}}} = + emqx_bridge_v2_testlib:create_connector_api(ConnectorParams), + {ok, {{_, 201, _}, _, #{}}} = + emqx_bridge_v2_testlib:create_action_api(ActionParams1), + {ok, {{_, 201, _}, _, #{}}} = + emqx_bridge_v2_testlib:create_action_api(ActionParams2), + + ?assertStatusAPI(Type, ActionName1, <<"connected">>), + ?assertStatusAPI(Type, ActionName2, <<"connected">>), + + RuleTopic = <<"t/a2">>, + {ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http(Type, RuleTopic, [ + {bridge_name, ActionName2} + ]), + {ok, C} = emqtt:start_link([]), + {ok, _} = emqtt:connect(C), + SendMessage = fun() -> + ReqPayload = payload(), + ReqPayloadBin = emqx_utils_json:encode(ReqPayload), + {ok, _} = emqtt:publish(C, RuleTopic, #{}, ReqPayloadBin, [ + {qos, 1}, {retain, false} + ]), + ok + end, + + %% Disabling a1 shouldn't disturb a2. + ?assertMatch( + {204, _}, emqx_bridge_v2_testlib:disable_kind_api(action, Type, ActionName1) + ), + + ?assertStatusAPI(Type, ActionName1, <<"disconnected">>), + ?assertStatusAPI(Type, ActionName2, <<"connected">>), + + ?assertMatch(ok, SendMessage()), + ?assertStatusAPI(Type, ActionName2, <<"connected">>), + + ?assertMatch( + {204, _}, + emqx_bridge_v2_testlib:enable_kind_api(action, Type, ActionName1) + ), + ?assertStatusAPI(Type, ActionName1, <<"connected">>), + ?assertStatusAPI(Type, ActionName2, <<"connected">>), + ?assertMatch(ok, SendMessage()), + + %% Deleting also shouldn't disrupt a2. + ?assertMatch( + {204, _}, + emqx_bridge_v2_testlib:delete_kind_api(action, Type, ActionName1) + ), + ?assertStatusAPI(Type, ActionName2, <<"connected">>), + ?assertMatch(ok, SendMessage()), + + ok + end, + [] + ), + ok. From 4c3c86e9190bda19383c3c4ec1f000ad11599026 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 26 Jun 2024 13:45:08 -0300 Subject: [PATCH 4/7] chore: bump wolff -> 2.0.0 --- apps/emqx_bridge_azure_event_hub/rebar.config | 2 +- apps/emqx_bridge_confluent/rebar.config | 2 +- apps/emqx_bridge_kafka/rebar.config | 2 +- mix.exs | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/emqx_bridge_azure_event_hub/rebar.config b/apps/emqx_bridge_azure_event_hub/rebar.config index 63f944a5c..76ea7fa6c 100644 --- a/apps/emqx_bridge_azure_event_hub/rebar.config +++ b/apps/emqx_bridge_azure_event_hub/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.6"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "2.0.0"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, diff --git a/apps/emqx_bridge_confluent/rebar.config b/apps/emqx_bridge_confluent/rebar.config index c6df829f9..1a91f501d 100644 --- a/apps/emqx_bridge_confluent/rebar.config +++ b/apps/emqx_bridge_confluent/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.6"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "2.0.0"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, diff --git a/apps/emqx_bridge_kafka/rebar.config b/apps/emqx_bridge_kafka/rebar.config index 5e394a261..b89c9190f 100644 --- a/apps/emqx_bridge_kafka/rebar.config +++ b/apps/emqx_bridge_kafka/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.6"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "2.0.0"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, diff --git a/mix.exs b/mix.exs index 68a79c73e..e4565d6d7 100644 --- a/mix.exs +++ b/mix.exs @@ -211,7 +211,7 @@ defmodule EMQXUmbrella.MixProject do {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1+ezstd-v1.0.5-emqx1"}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true}, - {:wolff, github: "kafka4beam/wolff", tag: "1.10.6"}, + {:wolff, github: "kafka4beam/wolff", tag: "2.0.0"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod, github: "kafka4beam/brod", tag: "3.18.0"}, From 4bd0abc93faa28e0956c75ebde9d78ab4cdfb4ef Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 27 Jun 2024 09:22:06 -0300 Subject: [PATCH 5/7] chore: bump app vsns --- apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src | 2 +- apps/emqx_resource/src/emqx_resource.app.src | 2 +- apps/emqx_rule_engine/src/emqx_rule_engine.app.src | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src index 9a09345eb..0e906203d 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_kafka, [ {description, "EMQX Enterprise Kafka Bridge"}, - {vsn, "0.3.2"}, + {vsn, "0.3.3"}, {registered, [emqx_bridge_kafka_consumer_sup]}, {applications, [ kernel, diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src index fb56e77c6..6e35949a9 100644 --- a/apps/emqx_resource/src/emqx_resource.app.src +++ b/apps/emqx_resource/src/emqx_resource.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_resource, [ {description, "Manager for all external resources"}, - {vsn, "0.1.31"}, + {vsn, "0.1.32"}, {registered, []}, {mod, {emqx_resource_app, []}}, {applications, [ diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index b893d7c59..9fa17d0c4 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -2,7 +2,7 @@ {application, emqx_rule_engine, [ {description, "EMQX Rule Engine"}, % strict semver, bump manually! - {vsn, "5.1.2"}, + {vsn, "5.1.3"}, {modules, []}, {registered, [emqx_rule_engine_sup, emqx_rule_engine]}, {applications, [ From 067beece758c3b3d0011a4c56578dc3689e54e28 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 28 Jun 2024 12:06:47 -0300 Subject: [PATCH 6/7] chore: add count to thrown error --- .../src/emqx_bridge_kafka_impl_producer.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index abdabef8c..b819925ac 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -324,12 +324,12 @@ on_query( }), do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) catch - error:{invalid_partition_count, _Count, _Partitioner} -> + error:{invalid_partition_count, Count, _Partitioner} -> ?tp("kafka_producer_invalid_partition_count", #{ action_id => MessageTag, query_mode => sync }), - {error, {unrecoverable_error, invalid_partition_count}}; + {error, {unrecoverable_error, {invalid_partition_count, Count}}}; throw:{bad_kafka_header, _} = Error -> ?tp( emqx_bridge_kafka_impl_producer_sync_query_failed, @@ -390,12 +390,12 @@ on_query_async( }), do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) catch - error:{invalid_partition_count, _Count, _Partitioner} -> + error:{invalid_partition_count, Count, _Partitioner} -> ?tp("kafka_producer_invalid_partition_count", #{ action_id => MessageTag, query_mode => async }), - {error, {unrecoverable_error, invalid_partition_count}}; + {error, {unrecoverable_error, {invalid_partition_count, Count}}}; throw:{bad_kafka_header, _} = Error -> ?tp( emqx_bridge_kafka_impl_producer_async_query_failed, From fd49f66267254ec892d2eaa827f6b3ccd8571994 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 28 Jun 2024 12:07:30 -0300 Subject: [PATCH 7/7] docs: improve descriptions Co-authored-by: zmstone --- changes/ee/breaking-13327.en.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changes/ee/breaking-13327.en.md b/changes/ee/breaking-13327.en.md index 645ab0c73..28d57d788 100644 --- a/changes/ee/breaking-13327.en.md +++ b/changes/ee/breaking-13327.en.md @@ -1,3 +1,3 @@ The directory path scheme for on-disk Kafka/Confluent/Azure Event Hub buffers has changed. It now uses the Action name instead of the topic name. -Updating to this version will invalidate (not use) old paths, and will require manual cleanup of the old directories. +Upgrading to this version will invalidate (not use) old buffer files, and will require manual cleanup of the old directories.