chore(kafka_producer): make schema changes more backwards compatible

This will still require fixes to the frontend.
This commit is contained in:
Thales Macedo Garitezi 2023-03-09 15:06:16 -03:00
parent 8b1fa50413
commit f31f15e44e
8 changed files with 270 additions and 30 deletions

View File

@ -55,7 +55,10 @@
T == gcp_pubsub;
T == influxdb_api_v1;
T == influxdb_api_v2;
T == kafka_producer;
%% TODO: rename this to `kafka_producer' after alias support is
%% added to hocon; keeping this as just `kafka' for backwards
%% compatibility.
T == kafka;
T == redis_single;
T == redis_sentinel;
T == redis_cluster;

View File

@ -309,7 +309,9 @@ parse_confs(Type, Name, Conf) when ?IS_INGRESS_BRIDGE(Type) ->
%% receives a message from the external database.
BId = bridge_id(Type, Name),
Conf#{hookpoint => <<"$bridges/", BId/binary>>, bridge_name => Name};
parse_confs(<<"kafka_producer">> = _Type, Name, Conf) ->
%% TODO: rename this to `kafka_producer' after alias support is added
%% to hocon; keeping this as just `kafka' for backwards compatibility.
parse_confs(<<"kafka">> = _Type, Name, Conf) ->
Conf#{bridge_name => Name};
parse_confs(_Type, _Name, Conf) ->
Conf.

View File

@ -51,8 +51,8 @@ emqx_ee_bridge_kafka {
}
producer_opts {
desc {
en: "Local MQTT data source and Kafka bridge configs. Should not configure this if the bridge is used as a rule action."
zh: "本地 MQTT 数据源和 Kafka 桥接的配置。若该桥接用于规则的动作,则必须将该配置项删除。"
en: "Local MQTT data source and Kafka bridge configs."
zh: "本地 MQTT 数据源和 Kafka 桥接的配置。"
}
label {
en: "MQTT to Kafka"
@ -61,8 +61,8 @@ emqx_ee_bridge_kafka {
}
mqtt_topic {
desc {
en: "MQTT topic or topic as data source (bridge input)."
zh: "指定 MQTT 主题作为桥接的数据源"
en: "MQTT topic or topic as data source (bridge input). Should not configure this if the bridge is used as a rule action."
zh: "指定 MQTT 主题作为桥接的数据源。 若该桥接用于规则的动作,则必须将该配置项删除。"
}
label {
en: "Source MQTT Topic"

View File

@ -66,7 +66,9 @@ examples(Method) ->
resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8));
resource_type(kafka_consumer) -> emqx_bridge_impl_kafka_consumer;
resource_type(kafka_producer) -> emqx_bridge_impl_kafka_producer;
%% TODO: rename this to `kafka_producer' after alias support is added
%% to hocon; keeping this as just `kafka' for backwards compatibility.
resource_type(kafka) -> emqx_bridge_impl_kafka_producer;
resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb;
resource_type(gcp_pubsub) -> emqx_ee_connector_gcp_pubsub;
resource_type(mongodb_rs) -> emqx_ee_connector_mongodb;
@ -145,12 +147,23 @@ mongodb_structs() ->
kafka_structs() ->
[
{Type,
%% TODO: rename this to `kafka_producer' after alias support
%% is added to hocon; keeping this as just `kafka' for
%% backwards compatibility.
{kafka,
mk(
hoconsc:map(name, ref(emqx_ee_bridge_kafka, Type)),
#{desc => <<"Kafka ", Name/binary, " Bridge Config">>, required => false}
hoconsc:map(name, ref(emqx_ee_bridge_kafka, kafka_producer)),
#{
desc => <<"Kafka Producer Bridge Config">>,
required => false,
converter => fun emqx_ee_bridge_kafka:kafka_producer_converter/2
}
)},
{kafka_consumer,
mk(
hoconsc:map(name, ref(emqx_ee_bridge_kafka, kafka_consumer)),
#{desc => <<"Kafka Consumer Bridge Config">>, required => false}
)}
|| {Type, Name} <- [{kafka_producer, <<"Producer">>}, {kafka_consumer, <<"Consumer">>}]
].
influxdb_structs() ->

View File

@ -30,13 +30,18 @@
host_opts/0
]).
-export([kafka_producer_converter/2]).
%% -------------------------------------------------------------------------------------------------
%% api
conn_bridge_examples(Method) ->
[
#{
<<"kafka_producer">> => #{
%% TODO: rename this to `kafka_producer' after alias
%% support is added to hocon; keeping this as just `kafka'
%% for backwards compatibility.
<<"kafka">> => #{
summary => <<"Kafka Producer Bridge">>,
value => values(Method)
}
@ -171,7 +176,7 @@ fields(producer_opts) ->
%% Note: there's an implicit convention in `emqx_bridge' that,
%% for egress bridges with this config, the published messages
%% will be forwarded to such bridges.
{local_topic, mk(binary(), #{desc => ?DESC(mqtt_topic)})},
{local_topic, mk(binary(), #{required => false, desc => ?DESC(mqtt_topic)})},
{kafka,
mk(ref(producer_kafka_opts), #{
required => true,
@ -332,10 +337,33 @@ struct_names() ->
%% internal
type_field() ->
{type,
mk(enum([kafka_consumer, kafka_producer]), #{required => true, desc => ?DESC("desc_type")})}.
%% TODO: rename `kafka' to `kafka_producer' after alias
%% support is added to hocon; keeping this as just `kafka' for
%% backwards compatibility.
mk(enum([kafka_consumer, kafka]), #{required => true, desc => ?DESC("desc_type")})}.
name_field() ->
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
ref(Name) ->
hoconsc:ref(?MODULE, Name).
kafka_producer_converter(undefined, _HoconOpts) ->
undefined;
kafka_producer_converter(
#{<<"producer">> := OldOpts0, <<"bootstrap_hosts">> := _} = Config0, _HoconOpts
) ->
%% old schema
MQTTOpts = maps:get(<<"mqtt">>, OldOpts0, #{}),
LocalTopic = maps:get(<<"topic">>, MQTTOpts, undefined),
KafkaOpts = maps:get(<<"kafka">>, OldOpts0),
Config = maps:without([<<"producer">>], Config0),
case LocalTopic =:= undefined of
true ->
Config#{<<"kafka">> => KafkaOpts};
false ->
Config#{<<"kafka">> => KafkaOpts, <<"local_topic">> => LocalTopic}
end;
kafka_producer_converter(Config, _HoconOpts) ->
%% new schema
Config.

View File

@ -22,6 +22,10 @@
-include_lib("emqx/include/logger.hrl").
%% TODO: rename this to `kafka_producer' after alias support is added
%% to hocon; keeping this as just `kafka' for backwards compatibility.
-define(BRIDGE_TYPE, kafka).
callback_mode() -> async_if_possible.
%% @doc Config schema is defined in emqx_ee_bridge_kafka.
@ -37,12 +41,11 @@ on_start(InstId, Config) ->
socket_opts := SocketOpts,
ssl := SSL
} = Config,
BridgeType = kafka_consumer,
BridgeType = ?BRIDGE_TYPE,
ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
_ = maybe_install_wolff_telemetry_handlers(ResourceId),
Hosts = emqx_bridge_impl_kafka:hosts(Hosts0),
KafkaType = kafka_producer,
ClientId = emqx_bridge_impl_kafka:make_client_id(KafkaType, BridgeName),
ClientId = emqx_bridge_impl_kafka:make_client_id(BridgeType, BridgeName),
ClientConfig = #{
min_metadata_refresh_interval => MinMetaRefreshInterval,
connect_timeout => ConnTimeout,
@ -315,7 +318,7 @@ producers_config(BridgeName, ClientId, Input, IsDryRun) ->
disk -> {false, replayq_dir(ClientId)};
hybrid -> {true, replayq_dir(ClientId)}
end,
BridgeType = kafka_producer,
BridgeType = ?BRIDGE_TYPE,
ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
#{
name => make_producer_name(BridgeName, IsDryRun),

View File

@ -30,16 +30,15 @@
-include_lib("emqx/include/emqx.hrl").
-include("emqx_dashboard.hrl").
-define(CONTENT_TYPE, "application/x-www-form-urlencoded").
-define(HOST, "http://127.0.0.1:18083").
%% -define(API_VERSION, "v5").
-define(BASE_PATH, "/api/v5").
-define(APP_DASHBOARD, emqx_dashboard).
-define(APP_MANAGEMENT, emqx_management).
%% TODO: rename this to `kafka_producer' after alias support is added
%% to hocon; keeping this as just `kafka' for backwards compatibility.
-define(BRIDGE_TYPE, "kafka").
%%------------------------------------------------------------------------------
%% CT boilerplate
@ -233,7 +232,7 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) ->
ok.
kafka_bridge_rest_api_helper(Config) ->
BridgeType = "kafka_producer",
BridgeType = ?BRIDGE_TYPE,
BridgeName = "my_kafka_bridge",
BridgeID = emqx_bridge_resource:bridge_id(
erlang:list_to_binary(BridgeType),
@ -244,6 +243,7 @@ kafka_bridge_rest_api_helper(Config) ->
erlang:list_to_binary(BridgeName)
),
UrlEscColon = "%3A",
BridgesProbeParts = ["bridges_probe"],
BridgeIdUrlEnc = BridgeType ++ UrlEscColon ++ BridgeName,
BridgesParts = ["bridges"],
BridgesPartsIdDeleteAlsoActions = ["bridges", BridgeIdUrlEnc ++ "?also_delete_dep_actions"],
@ -277,7 +277,7 @@ kafka_bridge_rest_api_helper(Config) ->
%% Create new Kafka bridge
KafkaTopic = "test-topic-one-partition",
CreateBodyTmp = #{
<<"type">> => <<"kafka_producer">>,
<<"type">> => <<?BRIDGE_TYPE>>,
<<"name">> => <<"my_kafka_bridge">>,
<<"bootstrap_hosts">> => iolist_to_binary(maps:get(<<"bootstrap_hosts">>, Config)),
<<"enable">> => true,
@ -300,6 +300,13 @@ kafka_bridge_rest_api_helper(Config) ->
{ok, 201, _Data} = show(http_post(BridgesParts, show(CreateBody))),
%% Check that the new bridge is in the list of bridges
true = MyKafkaBridgeExists(),
%% Probe should work
{ok, 204, _} = http_post(BridgesProbeParts, CreateBody),
%% no extra atoms should be created when probing
AtomsBefore = erlang:system_info(atom_count),
{ok, 204, _} = http_post(BridgesProbeParts, CreateBody),
AtomsAfter = erlang:system_info(atom_count),
?assertEqual(AtomsBefore, AtomsAfter),
%% Create a rule that uses the bridge
{ok, 201, _Rule} = http_post(
["rules"],
@ -377,10 +384,10 @@ t_failed_creation_then_fix(Config) ->
ValidAuthSettings = valid_sasl_plain_settings(),
WrongAuthSettings = ValidAuthSettings#{"password" := "wrong"},
Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]),
Type = kafka_producer,
Type = ?BRIDGE_TYPE,
Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
ResourceId = emqx_bridge_resource:resource_id("kafka_producer", Name),
BridgeId = emqx_bridge_resource:bridge_id("kafka_producer", Name),
ResourceId = emqx_bridge_resource:resource_id(Type, Name),
BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
KafkaTopic = "test-topic-one-partition",
WrongConf = config(#{
"authentication" => WrongAuthSettings,
@ -511,7 +518,7 @@ publish_helper(
end,
Hash = erlang:phash2([HostsString, AuthSettings, SSLSettings]),
Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
Type = "kafka_producer",
Type = ?BRIDGE_TYPE,
InstId = emqx_bridge_resource:resource_id(Type, Name),
KafkaTopic = "test-topic-one-partition",
Conf = config(
@ -526,7 +533,7 @@ publish_helper(
Conf0
),
{ok, _} = emqx_bridge:create(
<<"kafka_producer">>, list_to_binary(Name), Conf
<<?BRIDGE_TYPE>>, list_to_binary(Name), Conf
),
Time = erlang:unique_integer(),
BinTime = integer_to_binary(Time),
@ -600,8 +607,11 @@ hocon_config(Args) ->
%% erlfmt-ignore
hocon_config_template() ->
%% TODO: rename the type to `kafka_producer' after alias support is
%% added to hocon; keeping this as just `kafka' for backwards
%% compatibility.
"""
bridges.kafka_producer.{{ bridge_name }} {
bridges.kafka.{{ bridge_name }} {
bootstrap_hosts = \"{{ kafka_hosts_string }}\"
enable = true
authentication = {{{ authentication }}}

View File

@ -0,0 +1,181 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_bridge_kafka_tests).
-include_lib("eunit/include/eunit.hrl").
%%===========================================================================
%% Test cases
%%===========================================================================
kafka_producer_test() ->
Conf1 = parse(kafka_producer_old_hocon(_WithLocalTopic0 = false)),
Conf2 = parse(kafka_producer_old_hocon(_WithLocalTopic1 = true)),
Conf3 = parse(kafka_producer_new_hocon()),
?assertMatch(
#{
<<"bridges">> :=
#{
<<"kafka">> :=
#{
<<"myproducer">> :=
#{<<"kafka">> := #{}}
}
}
},
check(Conf1)
),
?assertNotMatch(
#{
<<"bridges">> :=
#{
<<"kafka">> :=
#{
<<"myproducer">> :=
#{<<"local_topic">> := _}
}
}
},
check(Conf1)
),
?assertMatch(
#{
<<"bridges">> :=
#{
<<"kafka">> :=
#{
<<"myproducer">> :=
#{
<<"kafka">> := #{},
<<"local_topic">> := <<"mqtt/local">>
}
}
}
},
check(Conf2)
),
?assertMatch(
#{
<<"bridges">> :=
#{
<<"kafka">> :=
#{
<<"myproducer">> :=
#{
<<"kafka">> := #{},
<<"local_topic">> := <<"mqtt/local">>
}
}
}
},
check(Conf3)
),
ok.
%%===========================================================================
%% Helper functions
%%===========================================================================
parse(Hocon) ->
{ok, Conf} = hocon:binary(Hocon),
Conf.
check(Conf) when is_map(Conf) ->
hocon_tconf:check_plain(emqx_bridge_schema, Conf).
%%===========================================================================
%% Data section
%%===========================================================================
%% erlfmt-ignore
kafka_producer_old_hocon(_WithLocalTopic = true) ->
kafka_producer_old_hocon("mqtt {topic = \"mqtt/local\"}\n");
kafka_producer_old_hocon(_WithLocalTopic = false) ->
kafka_producer_old_hocon("mqtt {}\n");
kafka_producer_old_hocon(MQTTConfig) when is_list(MQTTConfig) ->
"""
bridges.kafka {
myproducer {
authentication = \"none\"
bootstrap_hosts = \"toxiproxy:9292\"
connect_timeout = \"5s\"
metadata_request_timeout = \"5s\"
min_metadata_refresh_interval = \"3s\"
producer {
kafka {
buffer {
memory_overload_protection = false
mode = \"memory\"
per_partition_limit = \"2GB\"
segment_bytes = \"100MB\"
}
compression = \"no_compression\"
max_batch_bytes = \"896KB\"
max_inflight = 10
message {
key = \"${.clientid}\"
timestamp = \"${.timestamp}\"
value = \"${.}\"
}
partition_count_refresh_interval = \"60s\"
partition_strategy = \"random\"
required_acks = \"all_isr\"
topic = \"test-topic-two-partitions\"
}
""" ++ MQTTConfig ++
"""
}
socket_opts {
nodelay = true
recbuf = \"1024KB\"
sndbuf = \"1024KB\"
}
ssl {enable = false, verify = \"verify_peer\"}
}
}
""".
kafka_producer_new_hocon() ->
""
"\n"
"bridges.kafka {\n"
" myproducer {\n"
" authentication = \"none\"\n"
" bootstrap_hosts = \"toxiproxy:9292\"\n"
" connect_timeout = \"5s\"\n"
" metadata_request_timeout = \"5s\"\n"
" min_metadata_refresh_interval = \"3s\"\n"
" kafka {\n"
" buffer {\n"
" memory_overload_protection = false\n"
" mode = \"memory\"\n"
" per_partition_limit = \"2GB\"\n"
" segment_bytes = \"100MB\"\n"
" }\n"
" compression = \"no_compression\"\n"
" max_batch_bytes = \"896KB\"\n"
" max_inflight = 10\n"
" message {\n"
" key = \"${.clientid}\"\n"
" timestamp = \"${.timestamp}\"\n"
" value = \"${.}\"\n"
" }\n"
" partition_count_refresh_interval = \"60s\"\n"
" partition_strategy = \"random\"\n"
" required_acks = \"all_isr\"\n"
" topic = \"test-topic-two-partitions\"\n"
" }\n"
" local_topic = \"mqtt/local\"\n"
" socket_opts {\n"
" nodelay = true\n"
" recbuf = \"1024KB\"\n"
" sndbuf = \"1024KB\"\n"
" }\n"
" ssl {enable = false, verify = \"verify_peer\"}\n"
" }\n"
"}\n"
"".