From bc5775a9888d08b348010c1dcac8aebb6e610fef Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 22 Mar 2024 09:43:29 -0300 Subject: [PATCH] fix(kafka_producer): handle ancient v1 config when migrating to actions Fixes https://emqx.atlassian.net/browse/EMQX-12064 --- .../src/emqx_bridge_kafka_action_info.erl | 11 ++++ .../test/emqx_bridge_kafka_tests.erl | 3 +- .../emqx_bridge_v2_kafka_producer_SUITE.erl | 60 +++++++++++++++++++ changes/ee/fix-12767.en.md | 1 + 4 files changed, 74 insertions(+), 1 deletion(-) create mode 100644 changes/ee/fix-12767.en.md diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl index 2c7810028..726d6e85e 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl @@ -28,6 +28,17 @@ connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> BridgeV1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config1), emqx_utils_maps:rename(<<"parameters">>, <<"kafka">>, BridgeV1Config2). +bridge_v1_config_to_action_config(BridgeV1Conf0 = #{<<"producer">> := _}, ConnectorName) -> + %% Ancient v1 config, when `kafka' key was wrapped by `producer' + BridgeV1Conf1 = emqx_utils_maps:unindent(<<"producer">>, BridgeV1Conf0), + BridgeV1Conf = + case maps:take(<<"mqtt">>, BridgeV1Conf1) of + {#{<<"topic">> := Topic}, BridgeV1Conf2} when is_binary(Topic) -> + BridgeV1Conf2#{<<"local_topic">> => Topic}; + _ -> + maps:remove(<<"mqtt">>, BridgeV1Conf1) + end, + bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName); bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) -> Config0 = emqx_action_info:transform_bridge_v1_config_to_action_config( BridgeV1Conf, ConnectorName, schema_module(), kafka_producer diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl index 9b597986e..2f20099ae 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl @@ -6,7 +6,8 @@ -include_lib("eunit/include/eunit.hrl"). --export([atoms/0]). +-export([atoms/0, kafka_producer_old_hocon/1]). + %% ensure atoms exist atoms() -> [myproducer, my_consumer]. diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index d883e5922..b51bd196c 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -36,6 +36,7 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> + emqx_common_test_helpers:clear_screen(), ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), KafkaHost = os:getenv("KAFKA_PLAIN_HOST", "toxiproxy.emqx.net"), @@ -79,9 +80,22 @@ end_per_suite(Config) -> emqx_cth_suite:stop(Apps), ok. +init_per_testcase(t_ancient_v1_config_migration_with_local_topic = TestCase, Config) -> + Cluster = setup_cluster_ancient_config(TestCase, Config, #{with_local_topic => true}), + [{cluster, Cluster} | Config]; +init_per_testcase(t_ancient_v1_config_migration_without_local_topic = TestCase, Config) -> + Cluster = setup_cluster_ancient_config(TestCase, Config, #{with_local_topic => false}), + [{cluster, Cluster} | Config]; init_per_testcase(_TestCase, Config) -> Config. +end_per_testcase(TestCase, Config) when + TestCase =:= t_ancient_v1_config_migration_with_local_topic; + TestCase =:= t_ancient_v1_config_migration_without_local_topic +-> + Cluster = ?config(cluster, Config), + emqx_cth_cluster:stop(Cluster), + ok; end_per_testcase(_TestCase, Config) -> ProxyHost = ?config(proxy_host, Config), ProxyPort = ?config(proxy_port, Config), @@ -94,6 +108,32 @@ end_per_testcase(_TestCase, Config) -> %% Helper fns %%------------------------------------------------------------------------------------- +basic_node_conf(WorkDir) -> + #{ + <<"node">> => #{ + <<"cookie">> => erlang:get_cookie(), + <<"data_dir">> => unicode:characters_to_binary(WorkDir) + } + }. + +setup_cluster_ancient_config(TestCase, Config, #{with_local_topic := WithLocalTopic}) -> + AncientIOList = emqx_bridge_kafka_tests:kafka_producer_old_hocon(WithLocalTopic), + {ok, AncientCfg0} = hocon:binary(AncientIOList), + WorkDir = emqx_cth_suite:work_dir(TestCase, Config), + BasicConf = basic_node_conf(WorkDir), + AncientCfg = emqx_utils_maps:deep_merge(BasicConf, AncientCfg0), + Apps = [ + emqx, + emqx_conf, + emqx_connector, + emqx_bridge_kafka, + {emqx_bridge, #{schema_mod => emqx_enterprise_schema, config => AncientCfg}} + ], + emqx_cth_cluster:start( + [{kafka_producer_ancient_cfg1, #{apps => Apps}}], + #{work_dir => WorkDir} + ). + check_send_message_with_bridge(BridgeName) -> #{offset := Offset, payload := Payload} = send_message(BridgeName), %% ###################################### @@ -578,3 +618,23 @@ t_create_connector_while_connection_is_down(Config) -> [] ), ok. + +t_ancient_v1_config_migration_with_local_topic(Config) -> + %% Simply starting this test case successfully is enough, as the core of the test is + %% to be able to successfully start the node with the ancient config. + [Node] = ?config(cluster, Config), + ?assertMatch( + [#{type := <<"kafka_producer">>}], + erpc:call(Node, fun emqx_bridge_v2:list/0) + ), + ok. + +t_ancient_v1_config_migration_without_local_topic(Config) -> + %% Simply starting this test case successfully is enough, as the core of the test is + %% to be able to successfully start the node with the ancient config. + [Node] = ?config(cluster, Config), + ?assertMatch( + [#{type := <<"kafka_producer">>}], + erpc:call(Node, fun emqx_bridge_v2:list/0) + ), + ok. diff --git a/changes/ee/fix-12767.en.md b/changes/ee/fix-12767.en.md new file mode 100644 index 000000000..7a650a086 --- /dev/null +++ b/changes/ee/fix-12767.en.md @@ -0,0 +1 @@ +Correctly migrate older Kafka Producer configurations (pre 5.0.2) to action and connector configurations.