fix(kafka and derivatives): add `alias` config to avoid clashes with same topic

Fixes https://emqx.atlassian.net/browse/EMQX-12592
This commit is contained in:
Thales Macedo Garitezi 2024-06-24 10:02:26 -03:00
parent 7e089dce6b
commit 6023012f8b
11 changed files with 202 additions and 22 deletions

View File

@ -288,6 +288,14 @@ request(Method, Path, Params) ->
Error Error
end. end.
simplify_result(Res) ->
case Res of
{error, {{_, Status, _}, _, Body}} ->
{Status, Body};
{ok, {{_, Status, _}, _, Body}} ->
{Status, Body}
end.
list_bridges_api() -> list_bridges_api() ->
Params = [], Params = [],
Path = emqx_mgmt_api_test_util:api_path(["actions"]), Path = emqx_mgmt_api_test_util:api_path(["actions"]),
@ -321,7 +329,7 @@ get_bridge_api(BridgeKind, BridgeType, BridgeName) ->
Path = emqx_mgmt_api_test_util:api_path([Root, BridgeId]), Path = emqx_mgmt_api_test_util:api_path([Root, BridgeId]),
ct:pal("get bridge ~p (via http)", [{BridgeKind, BridgeType, BridgeName}]), ct:pal("get bridge ~p (via http)", [{BridgeKind, BridgeType, BridgeName}]),
Res = request(get, Path, Params), Res = request(get, Path, Params),
ct:pal("get bridge ~p result: ~p", [{BridgeKind, BridgeType, BridgeName}, Res]), ct:pal("get bridge ~p result:\n ~p", [{BridgeKind, BridgeType, BridgeName}, Res]),
Res. Res.
create_bridge_api(Config) -> create_bridge_api(Config) ->
@ -349,6 +357,26 @@ create_kind_api(Config, Overrides) ->
ct:pal("bridge create (~s, http) result:\n ~p", [Kind, Res]), ct:pal("bridge create (~s, http) result:\n ~p", [Kind, Res]),
Res. Res.
enable_kind_api(Kind, ConnectorType, ConnectorName) ->
do_enable_disable_kind_api(Kind, ConnectorType, ConnectorName, enable).
disable_kind_api(Kind, ConnectorType, ConnectorName) ->
do_enable_disable_kind_api(Kind, ConnectorType, ConnectorName, disable).
do_enable_disable_kind_api(Kind, Type, Name, Op) ->
BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
RootBin = api_path_root(Kind),
{OpPath, OpStr} =
case Op of
enable -> {"true", "enable"};
disable -> {"false", "disable"}
end,
Path = emqx_mgmt_api_test_util:api_path([RootBin, BridgeId, "enable", OpPath]),
ct:pal(OpStr ++ " ~s ~s (http)", [Kind, BridgeId]),
Res = request(put, Path, []),
ct:pal(OpStr ++ " ~s ~s (http) result:\n ~p", [Kind, BridgeId, Res]),
simplify_result(Res).
create_connector_api(Config) -> create_connector_api(Config) ->
create_connector_api(Config, _Overrides = #{}). create_connector_api(Config, _Overrides = #{}).
@ -453,6 +481,15 @@ update_bridge_api(Config, Overrides) ->
ct:pal("update bridge (~s, http) result:\n ~p", [Kind, Res]), ct:pal("update bridge (~s, http) result:\n ~p", [Kind, Res]),
Res. Res.
delete_kind_api(Kind, Type, Name) ->
BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
PathRoot = api_path_root(Kind),
Path = emqx_mgmt_api_test_util:api_path([PathRoot, BridgeId]),
ct:pal("deleting bridge (~s, http)", [Kind]),
Res = request(delete, Path, _Params = []),
ct:pal("delete bridge (~s, http) result:\n ~p", [Kind, Res]),
simplify_result(Res).
op_bridge_api(Op, BridgeType, BridgeName) -> op_bridge_api(Op, BridgeType, BridgeName) ->
op_bridge_api(_Kind = action, Op, BridgeType, BridgeName). op_bridge_api(_Kind = action, Op, BridgeType, BridgeName).

View File

@ -2,7 +2,7 @@
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.5"}}}, {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.6"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},

View File

@ -40,6 +40,8 @@ init_per_suite(Config) ->
emqx, emqx,
emqx_management, emqx_management,
emqx_resource, emqx_resource,
%% Just for test helpers
brod,
emqx_bridge_azure_event_hub, emqx_bridge_azure_event_hub,
emqx_bridge, emqx_bridge,
emqx_rule_engine, emqx_rule_engine,
@ -93,6 +95,9 @@ common_init_per_testcase(TestCase, Config) ->
{connector_type, ?CONNECTOR_TYPE}, {connector_type, ?CONNECTOR_TYPE},
{connector_name, Name}, {connector_name, Name},
{connector_config, ConnectorConfig}, {connector_config, ConnectorConfig},
{action_type, ?BRIDGE_TYPE},
{action_name, Name},
{action_config, BridgeConfig},
{bridge_type, ?BRIDGE_TYPE}, {bridge_type, ?BRIDGE_TYPE},
{bridge_name, Name}, {bridge_name, Name},
{bridge_config, BridgeConfig} {bridge_config, BridgeConfig}
@ -100,18 +105,13 @@ common_init_per_testcase(TestCase, Config) ->
]. ].
end_per_testcase(_Testcase, Config) -> end_per_testcase(_Testcase, Config) ->
case proplists:get_bool(skip_does_not_apply, Config) of ProxyHost = ?config(proxy_host, Config),
true -> ProxyPort = ?config(proxy_port, Config),
ok; emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
false -> emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
ProxyHost = ?config(proxy_host, Config), emqx_common_test_helpers:call_janitor(60_000),
ProxyPort = ?config(proxy_port, Config), ok = snabbkaffe:stop(),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), ok.
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
emqx_common_test_helpers:call_janitor(60_000),
ok = snabbkaffe:stop(),
ok
end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Helper fns %% Helper fns
@ -172,7 +172,7 @@ bridge_config(Name, ConnectorId, KafkaTopic) ->
#{ #{
<<"enable">> => true, <<"enable">> => true,
<<"connector">> => ConnectorId, <<"connector">> => ConnectorId,
<<"kafka">> => <<"parameters">> =>
#{ #{
<<"buffer">> => <<"buffer">> =>
#{ #{
@ -322,7 +322,7 @@ t_same_name_azure_kafka_bridges(Config) ->
), ),
%% then create a Kafka bridge with same name and delete it after creation %% then create a Kafka bridge with same name and delete it after creation
ConfigKafka0 = lists:keyreplace(bridge_type, 1, Config, {bridge_type, ?KAFKA_BRIDGE_TYPE}), ConfigKafka0 = lists:keyreplace(action_type, 1, Config, {action_type, ?KAFKA_BRIDGE_TYPE}),
ConfigKafka = lists:keyreplace( ConfigKafka = lists:keyreplace(
connector_type, 1, ConfigKafka0, {connector_type, ?KAFKA_BRIDGE_TYPE} connector_type, 1, ConfigKafka0, {connector_type, ?KAFKA_BRIDGE_TYPE}
), ),
@ -374,3 +374,20 @@ t_http_api_get(Config) ->
emqx_bridge_testlib:list_bridges_api() emqx_bridge_testlib:list_bridges_api()
), ),
ok. ok.
t_multiple_actions_sharing_topic(Config) ->
ActionConfig0 = ?config(action_config, Config),
ActionConfig =
emqx_utils_maps:deep_merge(
ActionConfig0,
#{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
),
ok = emqx_bridge_v2_kafka_producer_SUITE:t_multiple_actions_sharing_topic(
[
{type, ?BRIDGE_TYPE_BIN},
{connector_name, ?config(connector_name, Config)},
{connector_config, ?config(connector_config, Config)},
{action_config, ActionConfig}
]
),
ok.

View File

@ -2,7 +2,7 @@
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.5"}}}, {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.6"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},

View File

@ -40,6 +40,8 @@ init_per_suite(Config) ->
emqx, emqx,
emqx_management, emqx_management,
emqx_resource, emqx_resource,
%% Just for test helpers
brod,
emqx_bridge_confluent, emqx_bridge_confluent,
emqx_bridge, emqx_bridge,
emqx_rule_engine, emqx_rule_engine,
@ -93,6 +95,9 @@ common_init_per_testcase(TestCase, Config) ->
{connector_type, ?CONNECTOR_TYPE}, {connector_type, ?CONNECTOR_TYPE},
{connector_name, Name}, {connector_name, Name},
{connector_config, ConnectorConfig}, {connector_config, ConnectorConfig},
{action_type, ?ACTION_TYPE},
{action_name, Name},
{action_config, BridgeConfig},
{bridge_type, ?ACTION_TYPE}, {bridge_type, ?ACTION_TYPE},
{bridge_name, Name}, {bridge_name, Name},
{bridge_config, BridgeConfig} {bridge_config, BridgeConfig}
@ -306,7 +311,7 @@ t_same_name_confluent_kafka_bridges(Config) ->
), ),
%% then create a Kafka bridge with same name and delete it after creation %% then create a Kafka bridge with same name and delete it after creation
ConfigKafka0 = lists:keyreplace(bridge_type, 1, Config, {bridge_type, ?KAFKA_BRIDGE_TYPE}), ConfigKafka0 = lists:keyreplace(action_type, 1, Config, {action_type, ?KAFKA_BRIDGE_TYPE}),
ConfigKafka = lists:keyreplace( ConfigKafka = lists:keyreplace(
connector_type, 1, ConfigKafka0, {connector_type, ?KAFKA_BRIDGE_TYPE} connector_type, 1, ConfigKafka0, {connector_type, ?KAFKA_BRIDGE_TYPE}
), ),
@ -378,3 +383,20 @@ t_list_v1_bridges(Config) ->
[] []
), ),
ok. ok.
t_multiple_actions_sharing_topic(Config) ->
ActionConfig0 = ?config(action_config, Config),
ActionConfig =
emqx_utils_maps:deep_merge(
ActionConfig0,
#{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
),
ok = emqx_bridge_v2_kafka_producer_SUITE:t_multiple_actions_sharing_topic(
[
{type, ?ACTION_TYPE_BIN},
{connector_name, ?config(connector_name, Config)},
{connector_config, ?config(connector_config, Config)},
{action_config, ActionConfig}
]
),
ok.

View File

@ -2,7 +2,7 @@
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.5"}}}, {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.6"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},

View File

@ -324,6 +324,12 @@ on_query(
}), }),
do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) do_send_msg(sync, KafkaMessage, Producers, SyncTimeout)
catch catch
error:{invalid_partition_count, _Count, _Partitioner} ->
?tp("kafka_producer_invalid_partition_count", #{
action_id => MessageTag,
query_mode => sync
}),
{error, invalid_partition_count};
throw:{bad_kafka_header, _} = Error -> throw:{bad_kafka_header, _} = Error ->
?tp( ?tp(
emqx_bridge_kafka_impl_producer_sync_query_failed, emqx_bridge_kafka_impl_producer_sync_query_failed,
@ -385,6 +391,10 @@ on_query_async(
do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn)
catch catch
error:{invalid_partition_count, _Count, _Partitioner} -> error:{invalid_partition_count, _Count, _Partitioner} ->
?tp("kafka_producer_invalid_partition_count", #{
action_id => MessageTag,
query_mode => async
}),
{error, invalid_partition_count}; {error, invalid_partition_count};
throw:{bad_kafka_header, _} = Error -> throw:{bad_kafka_header, _} = Error ->
?tp( ?tp(
@ -690,6 +700,7 @@ producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) ->
max_batch_bytes => MaxBatchBytes, max_batch_bytes => MaxBatchBytes,
max_send_ahead => MaxInflight - 1, max_send_ahead => MaxInflight - 1,
compression => Compression, compression => Compression,
alias => BridgeV2Id,
telemetry_meta_data => #{bridge_id => BridgeV2Id}, telemetry_meta_data => #{bridge_id => BridgeV2Id},
max_partitions => MaxPartitions max_partitions => MaxPartitions
}. }.

View File

@ -142,6 +142,9 @@ check_send_message_with_bridge(BridgeName) ->
check_kafka_message_payload(Offset, Payload). check_kafka_message_payload(Offset, Payload).
send_message(ActionName) -> send_message(ActionName) ->
send_message(?TYPE, ActionName).
send_message(Type, ActionName) ->
%% ###################################### %% ######################################
%% Create Kafka message %% Create Kafka message
%% ###################################### %% ######################################
@ -157,8 +160,8 @@ send_message(ActionName) ->
%% ###################################### %% ######################################
%% Send message %% Send message
%% ###################################### %% ######################################
emqx_bridge_v2:send_message(?TYPE, ActionName, Msg, #{}), Res = emqx_bridge_v2:send_message(Type, ActionName, Msg, #{}),
#{offset => Offset, payload => Payload}. #{offset => Offset, payload => Payload, result => Res}.
resolve_kafka_offset() -> resolve_kafka_offset() ->
KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(), KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
@ -285,6 +288,18 @@ action_api_spec_props_for_get() ->
emqx_bridge_v2_testlib:actions_api_spec_schemas(), emqx_bridge_v2_testlib:actions_api_spec_schemas(),
Props. Props.
assert_status_api(Line, Type, Name, Status) ->
?assertMatch(
{ok,
{{_, 200, _}, _, #{
<<"status">> := Status,
<<"node_status">> := [#{<<"status">> := Status}]
}}},
emqx_bridge_v2_testlib:get_bridge_api(Type, Name),
#{line => Line, name => Name, expected_status => Status}
).
-define(assertStatusAPI(TYPE, NAME, STATUS), assert_status_api(?LINE, TYPE, NAME, STATUS)).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Testcases %% Testcases
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -662,3 +677,77 @@ t_ancient_v1_config_migration_without_local_topic(Config) ->
erpc:call(Node, fun emqx_bridge_v2:list/0) erpc:call(Node, fun emqx_bridge_v2:list/0)
), ),
ok. ok.
%% Tests that deleting/disabling an action that share the same Kafka topic with other
%% actions do not disturb the latter.
t_multiple_actions_sharing_topic(Config) ->
Type = proplists:get_value(type, Config, ?TYPE),
ConnectorName = proplists:get_value(connector_name, Config, <<"c">>),
ConnectorConfig = proplists:get_value(connector_config, Config, connector_config()),
ActionConfig = proplists:get_value(action_config, Config, action_config(ConnectorName)),
?check_trace(
begin
ConnectorParams = [
{connector_config, ConnectorConfig},
{connector_name, ConnectorName},
{connector_type, Type}
],
ActionName1 = <<"a1">>,
ActionParams1 = [
{action_config, ActionConfig},
{action_name, ActionName1},
{action_type, Type}
],
ActionName2 = <<"a2">>,
ActionParams2 = [
{action_config, ActionConfig},
{action_name, ActionName2},
{action_type, Type}
],
{ok, {{_, 201, _}, _, #{}}} =
emqx_bridge_v2_testlib:create_connector_api(ConnectorParams),
{ok, {{_, 201, _}, _, #{}}} =
emqx_bridge_v2_testlib:create_action_api(ActionParams1),
{ok, {{_, 201, _}, _, #{}}} =
emqx_bridge_v2_testlib:create_action_api(ActionParams2),
RuleTopic = <<"t/a2">>,
{ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http(Type, RuleTopic, Config),
?assertStatusAPI(Type, ActionName1, <<"connected">>),
?assertStatusAPI(Type, ActionName2, <<"connected">>),
%% Disabling a1 shouldn't disturb a2.
?assertMatch(
{204, _}, emqx_bridge_v2_testlib:disable_kind_api(action, Type, ActionName1)
),
?assertStatusAPI(Type, ActionName1, <<"disconnected">>),
?assertStatusAPI(Type, ActionName2, <<"connected">>),
?assertMatch(#{result := ok}, send_message(Type, ActionName2)),
?assertStatusAPI(Type, ActionName2, <<"connected">>),
?assertMatch(
{204, _},
emqx_bridge_v2_testlib:enable_kind_api(action, Type, ActionName1)
),
?assertStatusAPI(Type, ActionName1, <<"connected">>),
?assertStatusAPI(Type, ActionName2, <<"connected">>),
?assertMatch(#{result := ok}, send_message(Type, ActionName2)),
%% Deleting also shouldn't disrupt a2.
?assertMatch(
{204, _},
emqx_bridge_v2_testlib:delete_kind_api(action, Type, ActionName1)
),
?assertStatusAPI(Type, ActionName2, <<"connected">>),
?assertMatch(#{result := ok}, send_message(Type, ActionName2)),
ok
end,
fun(Trace) ->
?assertEqual([], ?of_kind("kafka_producer_invalid_partition_count", Trace)),
ok
end
),
ok.

View File

@ -0,0 +1,3 @@
The directory path scheme for on-disk Kafka/Confluent/Azure Event Hub buffers has changed. It now uses the Action name instead of the topic name.
Updating to this version will invalidate (not use) old paths, and will require manual cleanup of the old directories.

View File

@ -0,0 +1 @@
Fixed an issue with Kafka, Confluent and Azure Event Hub bridges where different actions targeting the same topic could break one another when being deleted or disabled.

View File

@ -211,7 +211,7 @@ defmodule EMQXUmbrella.MixProject do
{:hstreamdb_erl, {:hstreamdb_erl,
github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1+ezstd-v1.0.5-emqx1"}, github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1+ezstd-v1.0.5-emqx1"},
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true},
{:wolff, github: "kafka4beam/wolff", tag: "1.10.5"}, {:wolff, github: "kafka4beam/wolff", tag: "1.10.6"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
{:brod, github: "kafka4beam/brod", tag: "3.18.0"}, {:brod, github: "kafka4beam/brod", tag: "3.18.0"},