Merge pull request #12767 from thalesmg/fix-older-kafka-migration-r56-20240322
fix(kafka_producer): handle ancient v1 config when migrating to actions
This commit is contained in:
commit
1218176e09
|
@ -28,6 +28,17 @@ connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
|
|||
BridgeV1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config1),
|
||||
emqx_utils_maps:rename(<<"parameters">>, <<"kafka">>, BridgeV1Config2).
|
||||
|
||||
bridge_v1_config_to_action_config(BridgeV1Conf0 = #{<<"producer">> := _}, ConnectorName) ->
|
||||
%% Ancient v1 config, when `kafka' key was wrapped by `producer'
|
||||
BridgeV1Conf1 = emqx_utils_maps:unindent(<<"producer">>, BridgeV1Conf0),
|
||||
BridgeV1Conf =
|
||||
case maps:take(<<"mqtt">>, BridgeV1Conf1) of
|
||||
{#{<<"topic">> := Topic}, BridgeV1Conf2} when is_binary(Topic) ->
|
||||
BridgeV1Conf2#{<<"local_topic">> => Topic};
|
||||
_ ->
|
||||
maps:remove(<<"mqtt">>, BridgeV1Conf1)
|
||||
end,
|
||||
bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName);
|
||||
bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
|
||||
Config0 = emqx_action_info:transform_bridge_v1_config_to_action_config(
|
||||
BridgeV1Conf, ConnectorName, schema_module(), kafka_producer
|
||||
|
|
|
@ -6,7 +6,8 @@
|
|||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-export([atoms/0]).
|
||||
-export([atoms/0, kafka_producer_old_hocon/1]).
|
||||
|
||||
%% ensure atoms exist
|
||||
atoms() -> [myproducer, my_consumer].
|
||||
|
||||
|
|
|
@ -36,6 +36,7 @@ all() ->
|
|||
emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_common_test_helpers:clear_screen(),
|
||||
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
|
||||
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
|
||||
KafkaHost = os:getenv("KAFKA_PLAIN_HOST", "toxiproxy.emqx.net"),
|
||||
|
@ -79,9 +80,22 @@ end_per_suite(Config) ->
|
|||
emqx_cth_suite:stop(Apps),
|
||||
ok.
|
||||
|
||||
init_per_testcase(t_ancient_v1_config_migration_with_local_topic = TestCase, Config) ->
|
||||
Cluster = setup_cluster_ancient_config(TestCase, Config, #{with_local_topic => true}),
|
||||
[{cluster, Cluster} | Config];
|
||||
init_per_testcase(t_ancient_v1_config_migration_without_local_topic = TestCase, Config) ->
|
||||
Cluster = setup_cluster_ancient_config(TestCase, Config, #{with_local_topic => false}),
|
||||
[{cluster, Cluster} | Config];
|
||||
init_per_testcase(_TestCase, Config) ->
|
||||
Config.
|
||||
|
||||
end_per_testcase(TestCase, Config) when
|
||||
TestCase =:= t_ancient_v1_config_migration_with_local_topic;
|
||||
TestCase =:= t_ancient_v1_config_migration_without_local_topic
|
||||
->
|
||||
Cluster = ?config(cluster, Config),
|
||||
emqx_cth_cluster:stop(Cluster),
|
||||
ok;
|
||||
end_per_testcase(_TestCase, Config) ->
|
||||
ProxyHost = ?config(proxy_host, Config),
|
||||
ProxyPort = ?config(proxy_port, Config),
|
||||
|
@ -94,6 +108,32 @@ end_per_testcase(_TestCase, Config) ->
|
|||
%% Helper fns
|
||||
%%-------------------------------------------------------------------------------------
|
||||
|
||||
basic_node_conf(WorkDir) ->
|
||||
#{
|
||||
<<"node">> => #{
|
||||
<<"cookie">> => erlang:get_cookie(),
|
||||
<<"data_dir">> => unicode:characters_to_binary(WorkDir)
|
||||
}
|
||||
}.
|
||||
|
||||
setup_cluster_ancient_config(TestCase, Config, #{with_local_topic := WithLocalTopic}) ->
|
||||
AncientIOList = emqx_bridge_kafka_tests:kafka_producer_old_hocon(WithLocalTopic),
|
||||
{ok, AncientCfg0} = hocon:binary(AncientIOList),
|
||||
WorkDir = emqx_cth_suite:work_dir(TestCase, Config),
|
||||
BasicConf = basic_node_conf(WorkDir),
|
||||
AncientCfg = emqx_utils_maps:deep_merge(BasicConf, AncientCfg0),
|
||||
Apps = [
|
||||
emqx,
|
||||
emqx_conf,
|
||||
emqx_connector,
|
||||
emqx_bridge_kafka,
|
||||
{emqx_bridge, #{schema_mod => emqx_enterprise_schema, config => AncientCfg}}
|
||||
],
|
||||
emqx_cth_cluster:start(
|
||||
[{kafka_producer_ancient_cfg1, #{apps => Apps}}],
|
||||
#{work_dir => WorkDir}
|
||||
).
|
||||
|
||||
check_send_message_with_bridge(BridgeName) ->
|
||||
#{offset := Offset, payload := Payload} = send_message(BridgeName),
|
||||
%% ######################################
|
||||
|
@ -578,3 +618,23 @@ t_create_connector_while_connection_is_down(Config) ->
|
|||
[]
|
||||
),
|
||||
ok.
|
||||
|
||||
t_ancient_v1_config_migration_with_local_topic(Config) ->
|
||||
%% Simply starting this test case successfully is enough, as the core of the test is
|
||||
%% to be able to successfully start the node with the ancient config.
|
||||
[Node] = ?config(cluster, Config),
|
||||
?assertMatch(
|
||||
[#{type := <<"kafka_producer">>}],
|
||||
erpc:call(Node, fun emqx_bridge_v2:list/0)
|
||||
),
|
||||
ok.
|
||||
|
||||
t_ancient_v1_config_migration_without_local_topic(Config) ->
|
||||
%% Simply starting this test case successfully is enough, as the core of the test is
|
||||
%% to be able to successfully start the node with the ancient config.
|
||||
[Node] = ?config(cluster, Config),
|
||||
?assertMatch(
|
||||
[#{type := <<"kafka_producer">>}],
|
||||
erpc:call(Node, fun emqx_bridge_v2:list/0)
|
||||
),
|
||||
ok.
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
Correctly migrate older Kafka Producer configurations (pre 5.0.2) to action and connector configurations.
|
Loading…
Reference in New Issue