Merge pull request #11989 from thalesmg/confluent-action-m-20231120

feat: add confluent connector/action
This commit is contained in:
Thales Macedo Garitezi 2023-11-21 11:01:40 -03:00 committed by GitHub
commit 6c9a8461f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1478 additions and 15 deletions

View File

@ -73,8 +73,9 @@
-if(?EMQX_RELEASE_EDITION == ee).
hard_coded_action_info_modules_ee() ->
[
emqx_bridge_kafka_action_info,
emqx_bridge_azure_event_hub_action_info,
emqx_bridge_confluent_producer_action_info,
emqx_bridge_kafka_action_info,
emqx_bridge_syskeeper_action_info
].
-else.

View File

@ -0,0 +1,94 @@
Business Source License 1.1
Licensor: Hangzhou EMQ Technologies Co., Ltd.
Licensed Work: EMQX Enterprise Edition
The Licensed Work is (c) 2023
Hangzhou EMQ Technologies Co., Ltd.
Additional Use Grant: Students and educators are granted right to copy,
modify, and create derivative work for research
or education.
Change Date: 2027-02-01
Change License: Apache License, Version 2.0
For information about alternative licensing arrangements for the Software,
please contact Licensor: https://www.emqx.com/en/contact
Notice
The Business Source License (this document, or the “License”) is not an Open
Source license. However, the Licensed Work will eventually be made available
under an Open Source License, as stated in this License.
License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
“Business Source License” is a trademark of MariaDB Corporation Ab.
-----------------------------------------------------------------------------
Business Source License 1.1
Terms
The Licensor hereby grants you the right to copy, modify, create derivative
works, redistribute, and make non-production use of the Licensed Work. The
Licensor may make an Additional Use Grant, above, permitting limited
production use.
Effective on the Change Date, or the fourth anniversary of the first publicly
available distribution of a specific version of the Licensed Work under this
License, whichever comes first, the Licensor hereby grants you rights under
the terms of the Change License, and the rights granted in the paragraph
above terminate.
If your use of the Licensed Work does not comply with the requirements
currently in effect as described in this License, you must purchase a
commercial license from the Licensor, its affiliated entities, or authorized
resellers, or you must refrain from using the Licensed Work.
All copies of the original and modified Licensed Work, and derivative works
of the Licensed Work, are subject to this License. This License applies
separately for each version of the Licensed Work and the Change Date may vary
for each version of the Licensed Work released by Licensor.
You must conspicuously display this License on each original or modified copy
of the Licensed Work. If you receive the Licensed Work in original or
modified form from a third party, the terms and conditions set forth in this
License apply to your use of that work.
Any use of the Licensed Work in violation of this License will automatically
terminate your rights under this License for the current and all other
versions of the Licensed Work.
This License does not grant you any right in any trademark or logo of
Licensor or its affiliates (provided that you may use a trademark or logo of
Licensor as expressly required by this License).
TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
TITLE.
MariaDB hereby grants you permission to use this Licenses text to license
your works, and to refer to it using the trademark “Business Source License”,
as long as you comply with the Covenants of Licensor below.
Covenants of Licensor
In consideration of the right to use this Licenses text and the “Business
Source License” name and trademark, Licensor covenants to MariaDB, and to all
other recipients of the licensed work to be provided by Licensor:
1. To specify as the Change License the GPL Version 2.0 or any later version,
or a license that is compatible with GPL Version 2.0 or a later version,
where “compatible” means that software provided under the Change License can
be included in a program with software provided under GPL Version 2.0 or a
later version. Licensor may specify additional Change Licenses without
limitation.
2. To either: (a) specify an additional grant of rights to use that does not
impose any additional restriction on the right granted in this License, as
the Additional Use Grant; or (b) insert the text “None”.
3. To specify a Change Date.
4. Not to modify this License in any other way.

View File

@ -0,0 +1,27 @@
# Confluent Data Integration Bridge
This application houses the Confluent Producer data integration bridge for EMQX Enterprise
Edition. It provides the means to connect to Confluent Producer and publish messages to
it via the Kafka protocol.
Currently, our Kafka Producer library (`wolff`) has its own `replayq` buffering
implementation, so this bridge does not require buffer workers from `emqx_resource`. It
implements the connection management and interaction without need for a separate connector
app, since it's not used by authentication and authorization applications.
# Documentation links
For more information about Kafka interface for Confluent, please see [the official
docs](https://docs.confluent.io/cloud/current/overview.html).
# Configurations
Please see [Ingest Data into Confluent](https://docs.emqx.com/en/enterprise/v5.3/data-integration/data-bridge-confluent.html) for more detailed info.
# Contributing
Please see our [contributing.md](../../CONTRIBUTING.md).
# License
EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt).

View File

@ -0,0 +1,2 @@
toxiproxy
kafka

View File

@ -0,0 +1,15 @@
%% -*- mode: erlang; -*-
{erl_opts, [debug_info]}.
{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.8.0"}}}
, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}}
, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0"}}}
, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}
, {snappyer, "1.2.9"}
, {emqx_connector, {path, "../../apps/emqx_connector"}}
, {emqx_resource, {path, "../../apps/emqx_resource"}}
, {emqx_bridge, {path, "../../apps/emqx_bridge"}}
]}.
{shell, [
{apps, [emqx_bridge_confluent]}
]}.

View File

@ -0,0 +1,15 @@
{application, emqx_bridge_confluent, [
{description, "EMQX Enterprise Confluent Connector and Action"},
{vsn, "0.1.0"},
{registered, []},
{applications, [
kernel,
stdlib,
emqx_resource,
telemetry,
wolff
]},
{env, []},
{modules, []},
{links, []}
]}.

View File

@ -0,0 +1,406 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_confluent_producer).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-behaviour(hocon_schema).
-behaviour(emqx_connector_resource).
%% `hocon_schema' API
-export([
namespace/0,
roots/0,
fields/1,
desc/1
]).
%% emqx_bridge_enterprise "unofficial" API
-export([
bridge_v2_examples/1,
connector_examples/1
]).
%% emqx_connector_resource behaviour callbacks
-export([connector_config/1]).
-export([host_opts/0]).
-import(hoconsc, [mk/2, enum/1, ref/2]).
-define(CONFLUENT_CONNECTOR_TYPE, confluent_producer).
-define(CONFLUENT_CONNECTOR_TYPE_BIN, <<"confluent_producer">>).
%%-------------------------------------------------------------------------------------------------
%% `hocon_schema' API
%%-------------------------------------------------------------------------------------------------
namespace() -> "confluent".
roots() -> ["config_producer"].
fields("put_connector") ->
Fields = override(
emqx_bridge_kafka:fields("put_connector"),
connector_overrides()
),
override_documentations(Fields);
fields("get_connector") ->
emqx_bridge_schema:status_fields() ++
fields("post_connector");
fields("post_connector") ->
Fields = override(
emqx_bridge_kafka:fields("post_connector"),
connector_overrides()
),
override_documentations(Fields);
fields("put_bridge_v2") ->
Fields = override(
emqx_bridge_kafka:fields("put_bridge_v2"),
bridge_v2_overrides()
),
override_documentations(Fields);
fields("get_bridge_v2") ->
emqx_bridge_schema:status_fields() ++
fields("post_bridge_v2");
fields("post_bridge_v2") ->
Fields = override(
emqx_bridge_kafka:fields("post_bridge_v2"),
bridge_v2_overrides()
),
override_documentations(Fields);
fields("config_bridge_v2") ->
fields(actions);
fields("config_connector") ->
Fields = override(
emqx_bridge_kafka:fields("config_connector"),
connector_overrides()
),
override_documentations(Fields);
fields(auth_username_password) ->
Fields = override(
emqx_bridge_kafka:fields(auth_username_password),
auth_overrides()
),
override_documentations(Fields);
fields(ssl_client_opts) ->
Fields = override(
emqx_bridge_kafka:ssl_client_opts_fields(),
ssl_overrides()
),
override_documentations(Fields);
fields(producer_kafka_opts) ->
Fields = override(
emqx_bridge_kafka:fields(producer_kafka_opts),
kafka_producer_overrides()
),
override_documentations(Fields);
fields(kafka_message) ->
Fields0 = emqx_bridge_kafka:fields(kafka_message),
Fields = proplists:delete(timestamp, Fields0),
override_documentations(Fields);
fields(action) ->
{confluent_producer,
mk(
hoconsc:map(name, ref(emqx_bridge_confluent_producer, actions)),
#{
desc => <<"Confluent Actions Config">>,
required => false
}
)};
fields(actions) ->
Fields =
override(
emqx_bridge_kafka:producer_opts(),
bridge_v2_overrides()
) ++
[
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{connector,
mk(binary(), #{
desc => ?DESC(emqx_connector_schema, "connector_field"), required => true
})},
{description, emqx_schema:description_schema()}
],
override_documentations(Fields);
fields(Method) ->
Fields = emqx_bridge_kafka:fields(Method),
override_documentations(Fields).
desc("config") ->
?DESC("desc_config");
desc("config_connector") ->
?DESC("desc_config");
desc("get_" ++ Type) when Type == "connector"; Type == "bridge_v2" ->
["Configuration for Confluent using `GET` method."];
desc("put_" ++ Type) when Type == "connector"; Type == "bridge_v2" ->
["Configuration for Confluent using `PUT` method."];
desc("post_" ++ Type) when Type == "connector"; Type == "bridge_v2" ->
["Configuration for Confluent using `POST` method."];
desc(Name) ->
lists:member(Name, struct_names()) orelse throw({missing_desc, Name}),
?DESC(Name).
struct_names() ->
[
auth_username_password,
kafka_message,
producer_kafka_opts,
actions,
ssl_client_opts
].
bridge_v2_examples(Method) ->
[
#{
?CONFLUENT_CONNECTOR_TYPE_BIN => #{
summary => <<"Confluent Action">>,
value => values({Method, bridge_v2})
}
}
].
connector_examples(Method) ->
[
#{
?CONFLUENT_CONNECTOR_TYPE_BIN => #{
summary => <<"Confluent Connector">>,
value => values({Method, connector})
}
}
].
values({get, ConfluentType}) ->
maps:merge(
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
]
},
values({post, ConfluentType})
);
values({post, bridge_v2}) ->
maps:merge(
values(action),
#{
enable => true,
connector => <<"my_confluent_producer_connector">>,
name => <<"my_confluent_producer_action">>,
type => ?CONFLUENT_CONNECTOR_TYPE_BIN
}
);
values({post, connector}) ->
maps:merge(
values(common_config),
#{
name => <<"my_confluent_producer_connector">>,
type => ?CONFLUENT_CONNECTOR_TYPE_BIN,
ssl => #{
enable => true,
server_name_indication => <<"auto">>,
verify => <<"verify_none">>,
versions => [<<"tlsv1.3">>, <<"tlsv1.2">>]
}
}
);
values({put, connector}) ->
values(common_config);
values({put, bridge_v2}) ->
maps:merge(
values(action),
#{
enable => true,
connector => <<"my_confluent_producer_connector">>
}
);
values(common_config) ->
#{
authentication => #{
password => <<"******">>
},
bootstrap_hosts => <<"xyz.sa-east1.gcp.confluent.cloud:9092">>,
connect_timeout => <<"5s">>,
enable => true,
metadata_request_timeout => <<"4s">>,
min_metadata_refresh_interval => <<"3s">>,
socket_opts => #{
sndbuf => <<"1024KB">>,
recbuf => <<"1024KB">>,
nodelay => true,
tcp_keepalive => <<"none">>
}
};
values(action) ->
#{
parameters => #{
topic => <<"topic">>,
message => #{
key => <<"${.clientid}">>,
value => <<"${.}">>
},
max_batch_bytes => <<"896KB">>,
partition_strategy => <<"random">>,
required_acks => <<"all_isr">>,
partition_count_refresh_interval => <<"60s">>,
kafka_headers => <<"${.pub_props}">>,
kafka_ext_headers => [
#{
kafka_ext_header_key => <<"clientid">>,
kafka_ext_header_value => <<"${clientid}">>
},
#{
kafka_ext_header_key => <<"topic">>,
kafka_ext_header_value => <<"${topic}">>
}
],
kafka_header_value_encode_mode => none,
max_inflight => 10,
buffer => #{
mode => <<"hybrid">>,
per_partition_limit => <<"2GB">>,
segment_bytes => <<"100MB">>,
memory_overload_protection => true
}
},
local_topic => <<"mqtt/local/topic">>
}.
%%-------------------------------------------------------------------------------------------------
%% `emqx_connector_resource' API
%%-------------------------------------------------------------------------------------------------
connector_config(Config) ->
%% Default port for Confluent is 9092
BootstrapHosts0 = maps:get(bootstrap_hosts, Config),
BootstrapHosts = emqx_schema:parse_servers(
BootstrapHosts0,
?MODULE:host_opts()
),
Config#{bootstrap_hosts := BootstrapHosts}.
%%-------------------------------------------------------------------------------------------------
%% Internal fns
%%-------------------------------------------------------------------------------------------------
ref(Name) ->
hoconsc:ref(?MODULE, Name).
connector_overrides() ->
#{
authentication =>
mk(
ref(auth_username_password),
#{
default => #{},
required => true,
desc => ?DESC("authentication")
}
),
bootstrap_hosts =>
mk(
binary(),
#{
required => true,
validator => emqx_schema:servers_validator(
host_opts(), _Required = true
)
}
),
ssl => mk(
ref(ssl_client_opts),
#{
required => true,
default => #{<<"enable">> => true}
}
),
type => mk(
?CONFLUENT_CONNECTOR_TYPE,
#{
required => true,
desc => ?DESC("connector_type")
}
)
}.
bridge_v2_overrides() ->
#{
parameters =>
mk(ref(producer_kafka_opts), #{
required => true,
validator => fun emqx_bridge_kafka:producer_strategy_key_validator/1
}),
ssl => mk(ref(ssl_client_opts), #{
default => #{
<<"enable">> => true,
<<"verify">> => <<"verify_none">>
}
}),
type => mk(
?CONFLUENT_CONNECTOR_TYPE,
#{
required => true,
desc => ?DESC("bridge_v2_type")
}
)
}.
auth_overrides() ->
#{
mechanism =>
mk(plain, #{
required => true,
default => plain,
importance => ?IMPORTANCE_HIDDEN
}),
username => mk(binary(), #{required => true}),
password => emqx_connector_schema_lib:password_field(#{required => true})
}.
%% Kafka has SSL disabled by default
%% Confluent must use SSL
ssl_overrides() ->
#{
"enable" => mk(true, #{default => true, importance => ?IMPORTANCE_HIDDEN}),
"verify" => mk(verify_none, #{default => verify_none, importance => ?IMPORTANCE_HIDDEN})
}.
kafka_producer_overrides() ->
#{
message => mk(ref(kafka_message), #{})
}.
override_documentations(Fields) ->
lists:map(
fun({Name, Sc}) ->
case hocon_schema:field_schema(Sc, desc) of
?DESC(emqx_bridge_kafka, Key) ->
%% to please dialyzer...
Override = #{type => hocon_schema:field_schema(Sc, type), desc => ?DESC(Key)},
{Name, hocon_schema:override(Sc, Override)};
_ ->
{Name, Sc}
end
end,
Fields
).
override(Fields, Overrides) ->
lists:map(
fun({Name, Sc}) ->
case maps:find(Name, Overrides) of
{ok, Override} ->
{Name, hocon_schema:override(Sc, Override)};
error ->
{Name, Sc}
end
end,
Fields
).
host_opts() ->
#{default_port => 9092}.

View File

@ -0,0 +1,19 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_confluent_producer_action_info).
-behaviour(emqx_action_info).
-export([
action_type_name/0,
connector_type_name/0,
schema_module/0
]).
action_type_name() -> confluent_producer.
connector_type_name() -> confluent_producer.
schema_module() -> emqx_bridge_confluent_producer.

View File

@ -0,0 +1,343 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_confluent_producer_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(BRIDGE_TYPE, confluent_producer).
-define(BRIDGE_TYPE_BIN, <<"confluent_producer">>).
-define(CONNECTOR_TYPE, confluent_producer).
-define(CONNECTOR_TYPE_BIN, <<"confluent_producer">>).
-define(KAFKA_BRIDGE_TYPE, kafka_producer).
-import(emqx_common_test_helpers, [on_exit/1]).
%%------------------------------------------------------------------------------
%% CT boilerplate
%%------------------------------------------------------------------------------
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
KafkaHost = os:getenv("KAFKA_SASL_SSL_HOST", "toxiproxy.emqx.net"),
KafkaPort = list_to_integer(os:getenv("KAFKA_SASL_SSL_PORT", "9295")),
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
ProxyName = "kafka_sasl_ssl",
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
case emqx_common_test_helpers:is_tcp_server_available(KafkaHost, KafkaPort) of
true ->
Apps = emqx_cth_suite:start(
[
emqx_conf,
emqx,
emqx_management,
emqx_resource,
emqx_bridge_confluent,
emqx_bridge,
emqx_rule_engine,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
#{work_dir => ?config(priv_dir, Config)}
),
{ok, Api} = emqx_common_test_http:create_default_app(),
[
{tc_apps, Apps},
{api, Api},
{proxy_name, ProxyName},
{proxy_host, ProxyHost},
{proxy_port, ProxyPort},
{kafka_host, KafkaHost},
{kafka_port, KafkaPort}
| Config
];
false ->
case os:getenv("IS_CI") of
"yes" ->
throw(no_kafka);
_ ->
{skip, no_kafka}
end
end.
end_per_suite(Config) ->
Apps = ?config(tc_apps, Config),
emqx_cth_suite:stop(Apps),
ok.
init_per_testcase(TestCase, Config) ->
common_init_per_testcase(TestCase, Config).
common_init_per_testcase(TestCase, Config) ->
ct:timetrap(timer:seconds(60)),
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
emqx_config:delete_override_conf_files(),
UniqueNum = integer_to_binary(erlang:unique_integer()),
Name = iolist_to_binary([atom_to_binary(TestCase), UniqueNum]),
KafkaHost = ?config(kafka_host, Config),
KafkaPort = ?config(kafka_port, Config),
KafkaTopic = Name,
ConnectorConfig = connector_config(Name, KafkaHost, KafkaPort),
{BridgeConfig, ExtraConfig} = bridge_config(Name, Name, KafkaTopic),
ensure_topic(Config, KafkaTopic, _Opts = #{}),
ok = snabbkaffe:start_trace(),
ExtraConfig ++
[
{connector_type, ?CONNECTOR_TYPE},
{connector_name, Name},
{connector_config, ConnectorConfig},
{bridge_type, ?BRIDGE_TYPE},
{bridge_name, Name},
{bridge_config, BridgeConfig}
| Config
].
end_per_testcase(_Testcase, Config) ->
case proplists:get_bool(skip_does_not_apply, Config) of
true ->
ok;
false ->
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
emqx_common_test_helpers:call_janitor(60_000),
ok = snabbkaffe:stop(),
ok
end.
%%------------------------------------------------------------------------------
%% Helper fns
%%------------------------------------------------------------------------------
connector_config(Name, KafkaHost, KafkaPort) ->
InnerConfigMap0 =
#{
<<"enable">> => true,
<<"bootstrap_hosts">> => iolist_to_binary([KafkaHost, ":", integer_to_binary(KafkaPort)]),
<<"authentication">> =>
#{
<<"mechanism">> => <<"plain">>,
<<"username">> => <<"emqxuser">>,
<<"password">> => <<"password">>
},
<<"connect_timeout">> => <<"5s">>,
<<"socket_opts">> =>
#{
<<"nodelay">> => true,
<<"recbuf">> => <<"1024KB">>,
<<"sndbuf">> => <<"1024KB">>,
<<"tcp_keepalive">> => <<"none">>
},
<<"ssl">> =>
#{
<<"cacertfile">> => shared_secret(client_cacertfile),
<<"certfile">> => shared_secret(client_certfile),
<<"keyfile">> => shared_secret(client_keyfile),
<<"ciphers">> => [],
<<"depth">> => 10,
<<"enable">> => true,
<<"hibernate_after">> => <<"5s">>,
<<"log_level">> => <<"notice">>,
<<"reuse_sessions">> => true,
<<"secure_renegotiate">> => true,
<<"server_name_indication">> => <<"disable">>,
%% currently, it seems our CI kafka certs fail peer verification
<<"verify">> => <<"verify_none">>,
<<"versions">> => [<<"tlsv1.3">>, <<"tlsv1.2">>]
}
},
InnerConfigMap = serde_roundtrip(InnerConfigMap0),
parse_and_check_connector_config(InnerConfigMap, Name).
parse_and_check_connector_config(InnerConfigMap, Name) ->
TypeBin = ?CONNECTOR_TYPE_BIN,
RawConf = #{<<"connectors">> => #{TypeBin => #{Name => InnerConfigMap}}},
#{<<"connectors">> := #{TypeBin := #{Name := Config}}} =
hocon_tconf:check_plain(emqx_connector_schema, RawConf, #{
required => false, atom_key => false
}),
ct:pal("parsed config: ~p", [Config]),
InnerConfigMap.
bridge_config(Name, ConnectorId, KafkaTopic) ->
InnerConfigMap0 =
#{
<<"enable">> => true,
<<"connector">> => ConnectorId,
<<"parameters">> =>
#{
<<"buffer">> =>
#{
<<"memory_overload_protection">> => true,
<<"mode">> => <<"memory">>,
<<"per_partition_limit">> => <<"2GB">>,
<<"segment_bytes">> => <<"100MB">>
},
<<"compression">> => <<"no_compression">>,
<<"kafka_header_value_encode_mode">> => <<"none">>,
<<"max_batch_bytes">> => <<"896KB">>,
<<"max_inflight">> => <<"10">>,
<<"message">> =>
#{
<<"key">> => <<"${.clientid}">>,
<<"value">> => <<"${.}">>
},
<<"partition_count_refresh_interval">> => <<"60s">>,
<<"partition_strategy">> => <<"random">>,
<<"query_mode">> => <<"async">>,
<<"required_acks">> => <<"all_isr">>,
<<"sync_query_timeout">> => <<"5s">>,
<<"topic">> => KafkaTopic
},
<<"local_topic">> => <<"t/confluent">>
%%,
},
InnerConfigMap = serde_roundtrip(InnerConfigMap0),
ExtraConfig =
[{kafka_topic, KafkaTopic}],
{parse_and_check_bridge_config(InnerConfigMap, Name), ExtraConfig}.
%% check it serializes correctly
serde_roundtrip(InnerConfigMap0) ->
IOList = hocon_pp:do(InnerConfigMap0, #{}),
{ok, InnerConfigMap} = hocon:binary(IOList),
InnerConfigMap.
parse_and_check_bridge_config(InnerConfigMap, Name) ->
TypeBin = ?BRIDGE_TYPE_BIN,
RawConf = #{<<"bridges">> => #{TypeBin => #{Name => InnerConfigMap}}},
hocon_tconf:check_plain(emqx_bridge_v2_schema, RawConf, #{required => false, atom_key => false}),
InnerConfigMap.
shared_secret_path() ->
os:getenv("CI_SHARED_SECRET_PATH", "/var/lib/secret").
shared_secret(client_keyfile) ->
filename:join([shared_secret_path(), "client.key"]);
shared_secret(client_certfile) ->
filename:join([shared_secret_path(), "client.crt"]);
shared_secret(client_cacertfile) ->
filename:join([shared_secret_path(), "ca.crt"]);
shared_secret(rig_keytab) ->
filename:join([shared_secret_path(), "rig.keytab"]).
ensure_topic(Config, KafkaTopic, Opts) ->
KafkaHost = ?config(kafka_host, Config),
KafkaPort = ?config(kafka_port, Config),
NumPartitions = maps:get(num_partitions, Opts, 3),
Endpoints = [{KafkaHost, KafkaPort}],
TopicConfigs = [
#{
name => KafkaTopic,
num_partitions => NumPartitions,
replication_factor => 1,
assignments => [],
configs => []
}
],
RequestConfig = #{timeout => 5_000},
ConnConfig =
#{
ssl => emqx_tls_lib:to_client_opts(
#{
keyfile => shared_secret(client_keyfile),
certfile => shared_secret(client_certfile),
cacertfile => shared_secret(client_cacertfile),
verify => verify_none,
enable => true
}
),
sasl => {plain, <<"emqxuser">>, <<"password">>}
},
case brod:create_topics(Endpoints, TopicConfigs, RequestConfig, ConnConfig) of
ok -> ok;
{error, topic_already_exists} -> ok
end.
make_message() ->
Time = erlang:unique_integer(),
BinTime = integer_to_binary(Time),
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
#{
clientid => BinTime,
payload => Payload,
timestamp => Time
}.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
t_start_stop(Config) ->
emqx_bridge_v2_testlib:t_start_stop(Config, kafka_producer_stopped),
ok.
t_create_via_http(Config) ->
emqx_bridge_v2_testlib:t_create_via_http(Config),
ok.
t_on_get_status(Config) ->
emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
ok.
t_sync_query(Config) ->
ok = emqx_bridge_v2_testlib:t_sync_query(
Config,
fun make_message/0,
fun(Res) -> ?assertEqual(ok, Res) end,
emqx_bridge_kafka_impl_producer_sync_query
),
ok.
t_same_name_confluent_kafka_bridges(Config) ->
BridgeName = ?config(bridge_name, Config),
TracePoint = emqx_bridge_kafka_impl_producer_sync_query,
%% creates the AEH bridge and check it's working
ok = emqx_bridge_v2_testlib:t_sync_query(
Config,
fun make_message/0,
fun(Res) -> ?assertEqual(ok, Res) end,
TracePoint
),
%% then create a Kafka bridge with same name and delete it after creation
ConfigKafka0 = lists:keyreplace(bridge_type, 1, Config, {bridge_type, ?KAFKA_BRIDGE_TYPE}),
ConfigKafka = lists:keyreplace(
connector_type, 1, ConfigKafka0, {connector_type, ?KAFKA_BRIDGE_TYPE}
),
ok = emqx_bridge_v2_testlib:t_create_via_http(ConfigKafka),
AehResourceId = emqx_bridge_v2_testlib:resource_id(Config),
KafkaResourceId = emqx_bridge_v2_testlib:resource_id(ConfigKafka),
%% check that both bridges are healthy
?assertEqual({ok, connected}, emqx_resource_manager:health_check(AehResourceId)),
?assertEqual({ok, connected}, emqx_resource_manager:health_check(KafkaResourceId)),
?assertMatch(
{{ok, _}, {ok, _}},
?wait_async_action(
emqx_connector:disable_enable(disable, ?KAFKA_BRIDGE_TYPE, BridgeName),
#{?snk_kind := kafka_producer_stopped},
5_000
)
),
% check that AEH bridge is still working
?check_trace(
begin
BridgeId = emqx_bridge_v2_testlib:bridge_id(Config),
Message = {BridgeId, make_message()},
?assertEqual(ok, emqx_resource:simple_sync_query(AehResourceId, Message)),
ok
end,
fun(Trace) ->
?assertMatch([#{instance_id := AehResourceId}], ?of_kind(TracePoint, Trace))
end
),
ok.

View File

@ -0,0 +1,179 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_confluent_tests).
-include_lib("eunit/include/eunit.hrl").
%%===========================================================================
%% Data Section
%%===========================================================================
%% erlfmt-ignore
confluent_producer_action_hocon() ->
"""
actions.confluent_producer.my_producer {
enable = true
connector = my_connector
parameters {
buffer {
memory_overload_protection = false
mode = memory
per_partition_limit = 2GB
segment_bytes = 100MB
}
compression = no_compression
kafka_header_value_encode_mode = none
max_batch_bytes = 896KB
max_inflight = 10
message {
key = \"${.clientid}\"
value = \"${.}\"
}
partition_count_refresh_interval = 60s
partition_strategy = random
query_mode = async
required_acks = all_isr
sync_query_timeout = 5s
topic = test
}
local_topic = \"t/confluent\"
}
""".
confluent_producer_connector_hocon() ->
""
"\n"
"connectors.confluent_producer.my_producer {\n"
" enable = true\n"
" authentication {\n"
" username = \"user\"\n"
" password = \"xxx\"\n"
" }\n"
" bootstrap_hosts = \"xyz.sa-east1.gcp.confluent.cloud:9092\"\n"
" connect_timeout = 5s\n"
" metadata_request_timeout = 5s\n"
" min_metadata_refresh_interval = 3s\n"
" socket_opts {\n"
" recbuf = 1024KB\n"
" sndbuf = 1024KB\n"
" tcp_keepalive = none\n"
" }\n"
"}\n"
"".
%%===========================================================================
%% Helper functions
%%===========================================================================
parse(Hocon) ->
{ok, Conf} = hocon:binary(Hocon),
Conf.
check(SchemaMod, Conf) when is_map(Conf) ->
hocon_tconf:check_plain(SchemaMod, Conf).
check_action(Conf) when is_map(Conf) ->
check(emqx_bridge_v2_schema, Conf).
check_connector(Conf) when is_map(Conf) ->
check(emqx_connector_schema, Conf).
-define(validation_error(SchemaMod, Reason, Value),
{SchemaMod, [
#{
kind := validation_error,
reason := Reason,
value := Value
}
]}
).
-define(action_validation_error(Reason, Value),
?validation_error(emqx_bridge_v2_schema, Reason, Value)
).
-define(connector_validation_error(Reason, Value),
?validation_error(emqx_connector_schema, Reason, Value)
).
-define(ok_config(RootKey, Cfg), #{
RootKey :=
#{
<<"confluent_producer">> :=
#{
<<"my_producer">> :=
Cfg
}
}
}).
-define(ok_connector_config(Cfg), ?ok_config(<<"connectors">>, Cfg)).
-define(ok_action_config(Cfg), ?ok_config(<<"actions">>, Cfg)).
%%===========================================================================
%% Test cases
%%===========================================================================
confluent_producer_connector_test_() ->
%% ensure this module is loaded when testing only this file
_ = emqx_bridge_enterprise:module_info(),
BaseConf = parse(confluent_producer_connector_hocon()),
Override = fun(Cfg) ->
emqx_utils_maps:deep_merge(
BaseConf,
#{
<<"connectors">> =>
#{
<<"confluent_producer">> =>
#{<<"my_producer">> => Cfg}
}
}
)
end,
[
{"base config",
?_assertMatch(
?ok_connector_config(
#{
<<"authentication">> := #{
<<"mechanism">> := plain
},
<<"ssl">> := #{
<<"enable">> := true,
<<"verify">> := verify_none
}
}
),
check_connector(BaseConf)
)},
{"ssl disabled",
?_assertThrow(
?connector_validation_error(#{expected := "true"}, "false"),
check_connector(Override(#{<<"ssl">> => #{<<"enable">> => <<"false">>}}))
)},
{"bad authn mechanism: scram sha256",
?_assertThrow(
?connector_validation_error(#{expected := "plain"}, "scram_sha_256"),
check_connector(
Override(#{<<"authentication">> => #{<<"mechanism">> => <<"scram_sha_256">>}})
)
)},
{"bad authn mechanism: scram sha512",
?_assertThrow(
?connector_validation_error(#{expected := "plain"}, "scram_sha_512"),
check_connector(
Override(#{<<"authentication">> => #{<<"mechanism">> => <<"scram_sha_512">>}})
)
)}
].
confluent_producer_action_test_() ->
%% ensure this module is loaded when testing only this file
_ = emqx_bridge_enterprise:module_info(),
BaseConf = parse(confluent_producer_action_hocon()),
[
{"base config",
?_assertMatch(
?ok_action_config(_),
check_action(BaseConf)
)}
].

View File

@ -20,11 +20,13 @@
resource_type(Type) when is_binary(Type) ->
resource_type(binary_to_atom(Type, utf8));
resource_type(kafka_producer) ->
emqx_bridge_kafka_impl_producer;
%% We use AEH's Kafka interface.
resource_type(azure_event_hub_producer) ->
emqx_bridge_kafka_impl_producer;
resource_type(confluent_producer) ->
emqx_bridge_kafka_impl_producer;
resource_type(kafka_producer) ->
emqx_bridge_kafka_impl_producer;
resource_type(syskeeper_forwarder) ->
emqx_bridge_syskeeper_connector;
resource_type(syskeeper_proxy) ->
@ -37,6 +39,8 @@ connector_impl_module(ConnectorType) when is_binary(ConnectorType) ->
connector_impl_module(binary_to_atom(ConnectorType, utf8));
connector_impl_module(azure_event_hub_producer) ->
emqx_bridge_azure_event_hub;
connector_impl_module(confluent_producer) ->
emqx_bridge_confluent_producer;
connector_impl_module(_ConnectorType) ->
undefined.
@ -45,14 +49,6 @@ fields(connectors) ->
connector_structs() ->
[
{kafka_producer,
mk(
hoconsc:map(name, ref(emqx_bridge_kafka, "config_connector")),
#{
desc => <<"Kafka Connector Config">>,
required => false
}
)},
{azure_event_hub_producer,
mk(
hoconsc:map(name, ref(emqx_bridge_azure_event_hub, "config_connector")),
@ -61,6 +57,22 @@ connector_structs() ->
required => false
}
)},
{confluent_producer,
mk(
hoconsc:map(name, ref(emqx_bridge_confluent_producer, "config_connector")),
#{
desc => <<"Confluent Connector Config">>,
required => false
}
)},
{kafka_producer,
mk(
hoconsc:map(name, ref(emqx_bridge_kafka, "config_connector")),
#{
desc => <<"Kafka Connector Config">>,
required => false
}
)},
{syskeeper_forwarder,
mk(
hoconsc:map(name, ref(emqx_bridge_syskeeper_connector, config)),
@ -93,8 +105,9 @@ examples(Method) ->
schema_modules() ->
[
emqx_bridge_kafka,
emqx_bridge_azure_event_hub,
emqx_bridge_confluent_producer,
emqx_bridge_kafka,
emqx_bridge_syskeeper_connector,
emqx_bridge_syskeeper_proxy
].
@ -103,10 +116,13 @@ api_schemas(Method) ->
[
%% We need to map the `type' field of a request (binary) to a
%% connector schema module.
api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"),
api_ref(
emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_connector"
),
api_ref(
emqx_bridge_confluent_producer, <<"confluent_producer">>, Method ++ "_connector"
),
api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"),
api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method),
api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method)
].

View File

@ -62,8 +62,9 @@ enterprise_fields_connectors() -> [].
-endif.
connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer];
connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer];
connector_type_to_bridge_types(confluent_producer) -> [confluent_producer];
connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer];
connector_type_to_bridge_types(syskeeper_forwarder) -> [syskeeper_forwarder];
connector_type_to_bridge_types(syskeeper_proxy) -> [].

View File

@ -129,7 +129,8 @@
emqx_gateway_gbt32960,
emqx_gateway_ocpp,
emqx_gateway_jt808,
emqx_bridge_syskeeper
emqx_bridge_syskeeper,
emqx_bridge_confluent
],
%% must always be of type `load'
ce_business_apps =>

View File

@ -183,6 +183,7 @@ defmodule EMQXUmbrella.MixProject do
defp enterprise_umbrella_apps() do
MapSet.new([
:emqx_bridge_kafka,
:emqx_bridge_confluent,
:emqx_bridge_gcp_pubsub,
:emqx_bridge_cassandra,
:emqx_bridge_opents,

View File

@ -79,6 +79,7 @@ is_enterprise(ce) -> false;
is_enterprise(ee) -> true.
is_community_umbrella_app("apps/emqx_bridge_kafka") -> false;
is_community_umbrella_app("apps/emqx_bridge_confluent") -> false;
is_community_umbrella_app("apps/emqx_bridge_gcp_pubsub") -> false;
is_community_umbrella_app("apps/emqx_bridge_cassandra") -> false;
is_community_umbrella_app("apps/emqx_bridge_opents") -> false;

View File

@ -0,0 +1,342 @@
emqx_bridge_confluent_producer {
connect_timeout.desc:
"""Maximum wait time for TCP connection establishment (including authentication time if enabled)."""
connect_timeout.label:
"""Connect Timeout"""
producer_opts.desc:
"""Local MQTT data source and Confluent bridge configs."""
producer_opts.label:
"""MQTT to Confluent"""
min_metadata_refresh_interval.desc:
"""Minimum time interval the client has to wait before refreshing Confluent Kafka broker and topic metadata. Setting too small value may add extra load on Confluent."""
min_metadata_refresh_interval.label:
"""Min Metadata Refresh Interval"""
kafka_producer.desc:
"""Confluent Producer configuration."""
kafka_producer.label:
"""Confluent Producer"""
producer_buffer.desc:
"""Configure producer message buffer.
Tell Confluent producer how to buffer messages when EMQX has more messages to send than Confluent can keep up, or when Confluent is down."""
producer_buffer.label:
"""Message Buffer"""
socket_send_buffer.desc:
"""Fine tune the socket send buffer. The default value is tuned for high throughput."""
socket_send_buffer.label:
"""Socket Send Buffer Size"""
socket_receive_buffer.desc:
"""Fine tune the socket receive buffer. The default value is tuned for high throughput."""
socket_receive_buffer.label:
"""Socket Receive Buffer Size"""
socket_tcp_keepalive.desc:
"""Enable TCP keepalive for Confluent bridge connections.
The value is three comma separated numbers in the format of 'Idle,Interval,Probes'
- Idle: The number of seconds a connection needs to be idle before the server begins to send out keep-alive probes (Linux default 7200).
- Interval: The number of seconds between TCP keep-alive probes (Linux default 75).
- Probes: The maximum number of TCP keep-alive probes to send before giving up and killing the connection if no response is obtained from the other end (Linux default 9).
For example "240,30,5" means: TCP keepalive probes are sent after the connection is idle for 240 seconds, and the probes are sent every 30 seconds until a response is received, if it misses 5 consecutive responses, the connection should be closed.
Default: 'none'"""
socket_tcp_keepalive.label:
"""TCP keepalive options"""
desc_name.desc:
"""Action name, used as a human-readable description of the action."""
desc_name.label:
"""Action Name"""
producer_kafka_opts.desc:
"""Confluent producer configs."""
producer_kafka_opts.label:
"""Confluent Producer"""
kafka_topic.desc:
"""Event Hub name"""
kafka_topic.label:
"""Event Hub Name"""
kafka_message_timestamp.desc:
"""Which timestamp to use. The timestamp is expected to be a millisecond precision Unix epoch which can be in string format, e.g. <code>1661326462115</code> or <code>'1661326462115'</code>. When the desired data field for this template is not found, or if the found data is not a valid integer, the current system timestamp will be used."""
kafka_message_timestamp.label:
"""Message Timestamp"""
buffer_mode.desc:
"""Message buffer mode.
<code>memory</code>: Buffer all messages in memory. The messages will be lost in case of EMQX node restart
<code>disk</code>: Buffer all messages on disk. The messages on disk are able to survive EMQX node restart.
<code>hybrid</code>: Buffer message in memory first, when up to certain limit (see <code>segment_bytes</code> config for more information), then start offloading messages to disk, Like <code>memory</code> mode, the messages will be lost in case of EMQX node restart."""
buffer_mode.label:
"""Buffer Mode"""
socket_opts.desc:
"""Extra socket options."""
socket_opts.label:
"""Socket Options"""
partition_count_refresh_interval.desc:
"""The time interval for Confluent producer to discover increased number of partitions.
After the number of partitions is increased in Confluent, EMQX will start taking the
discovered partitions into account when dispatching messages per <code>partition_strategy</code>."""
partition_count_refresh_interval.label:
"""Partition Count Refresh Interval"""
max_batch_bytes.desc:
"""Maximum bytes to collect in a Confluent message batch. Most of the Kafka brokers default to a limit of 1 MB batch size. EMQX's default value is less than 1 MB in order to compensate Kafka message encoding overheads (especially when each individual message is very small). When a single message is over the limit, it is still sent (as a single element batch)."""
max_batch_bytes.label:
"""Max Batch Bytes"""
required_acks.desc:
"""Required acknowledgements for Confluent partition leader to wait for its followers before it sends back the acknowledgement to EMQX Confluent producer
<code>all_isr</code>: Require all in-sync replicas to acknowledge.
<code>leader_only</code>: Require only the partition-leader's acknowledgement."""
required_acks.label:
"""Required Acks"""
kafka_headers.desc:
"""Please provide a placeholder to be used as Confluent Headers<br/>
e.g. <code>${pub_props}</code><br/>
Notice that the value of the placeholder must either be an object:
<code>{\"foo\": \"bar\"}</code>
or an array of key-value pairs:
<code>[{\"key\": \"foo\", \"value\": \"bar\"}]</code>"""
kafka_headers.label:
"""Confluent Headers"""
producer_kafka_ext_headers.desc:
"""Please provide more key-value pairs for Confluent headers<br/>
The key-value pairs here will be combined with the
value of <code>kafka_headers</code> field before sending to Confluent."""
producer_kafka_ext_headers.label:
"""Extra Confluent headers"""
producer_kafka_ext_header_key.desc:
"""Key of the Confluent header. Placeholders in format of ${var} are supported."""
producer_kafka_ext_header_key.label:
"""Confluent extra header key."""
producer_kafka_ext_header_value.desc:
"""Value of the Confluent header. Placeholders in format of ${var} are supported."""
producer_kafka_ext_header_value.label:
"""Value"""
kafka_header_value_encode_mode.desc:
"""Confluent headers value encode mode<br/>
- NONE: only add binary values to Confluent headers;<br/>
- JSON: only add JSON values to Confluent headers,
and encode it to JSON strings before sending."""
kafka_header_value_encode_mode.label:
"""Confluent headers value encode mode"""
metadata_request_timeout.desc:
"""Maximum wait time when fetching metadata from Confluent."""
metadata_request_timeout.label:
"""Metadata Request Timeout"""
desc_type.desc:
"""The Action Type"""
desc_type.label:
"""Action Type"""
socket_nodelay.desc:
"""When set to 'true', TCP buffer is sent as soon as possible. Otherwise, the OS kernel may buffer small TCP packets for a while (40 ms by default)."""
socket_nodelay.label:
"""No Delay"""
authentication.desc:
"""Authentication configs."""
authentication.label:
"""Authentication"""
connector_type.label:
"""Connector Type"""
connector_type.desc:
"""The type of the connector."""
bridge_v2_type.label:
"""Action Type"""
bridge_v2_type.desc:
"""The type of the action."""
actions.label:
"""Action Config"""
actions.desc:
"""The configuration for an action."""
buffer_memory_overload_protection.desc:
"""Applicable when buffer mode is set to <code>memory</code>
EMQX will drop old buffered messages under high memory pressure. The high memory threshold is defined in config <code>sysmon.os.sysmem_high_watermark</code>. NOTE: This config only works on Linux."""
buffer_memory_overload_protection.label:
"""Memory Overload Protection"""
auth_sasl_mechanism.desc:
"""SASL authentication mechanism."""
auth_sasl_mechanism.label:
"""Mechanism"""
config_enable.desc:
"""Enable (true) or disable (false) this action."""
config_enable.label:
"""Enable or Disable"""
desc_config.desc:
"""Configuration for a Confluent action."""
desc_config.label:
"""Confluent Action Configuration"""
buffer_per_partition_limit.desc:
"""Number of bytes allowed to buffer for each Confluent partition. When this limit is exceeded, old messages will be dropped in a trade for credits for new messages to be buffered."""
buffer_per_partition_limit.label:
"""Per-partition Buffer Limit"""
bootstrap_hosts.desc:
"""A comma separated list of Confluent Kafka <code>host[:port]</code> namespace endpoints to bootstrap the client. Default port number is 9092."""
bootstrap_hosts.label:
"""Bootstrap Server"""
kafka_message_key.desc:
"""Template to render Confluent message key. If the template is rendered into a NULL value (i.e. there is no such data field in Rule Engine context) then Confluent's <code>NULL</code> (but not empty string) is used."""
kafka_message_key.label:
"""Message Key"""
kafka_message.desc:
"""Template to render a Confluent message."""
kafka_message.label:
"""Confluent Message Template"""
mqtt_topic.desc:
"""MQTT topic or topic filter as data source (action input). If rule action is used as data source, this config should be left empty, otherwise messages will be duplicated in Confluent."""
mqtt_topic.label:
"""Source MQTT Topic"""
kafka_message_value.desc:
"""Template to render Confluent message value. If the template is rendered into a NULL value (i.e. there is no such data field in Rule Engine context) then Confluent's <code>NULL</code> (but not empty string) is used."""
kafka_message_value.label:
"""Message Value"""
partition_strategy.desc:
"""Partition strategy is to tell the producer how to dispatch messages to Confluent partitions.
<code>random</code>: Randomly pick a partition for each message
<code>key_dispatch</code>: Hash Confluent message key to a partition number"""
partition_strategy.label:
"""Partition Strategy"""
buffer_segment_bytes.desc:
"""Applicable when buffer mode is set to <code>disk</code> or <code>hybrid</code>.
This value is to specify the size of each on-disk buffer file."""
buffer_segment_bytes.label:
"""Segment File Bytes"""
max_inflight.desc:
"""Maximum number of batches allowed for Confluent producer (per-partition) to send before receiving acknowledgement from Confluent. Greater value typically means better throughput. However, there can be a risk of message reordering when this value is greater than 1."""
max_inflight.label:
"""Max Inflight"""
compression.desc:
"""Compression method."""
compression.label:
"""Compression"""
query_mode.desc:
"""Query mode. Optional 'sync/async', default 'async'."""
query_mode.label:
"""Query mode"""
sync_query_timeout.desc:
"""This parameter defines the timeout limit for synchronous queries. It applies only when the action query mode is configured to 'sync'."""
sync_query_timeout.label:
"""Synchronous Query Timeout"""
auth_username_password.desc:
"""Username/password based authentication."""
auth_username_password.label:
"""Username/password Auth"""
auth_sasl_username.desc:
"""Confluent Key."""
auth_sasl_username.label:
"""Key"""
auth_sasl_password.desc:
"""Confluent Secret."""
auth_sasl_password.label:
"""Secret"""
producer_kafka_opts.desc:
"""Confluent producer configs."""
producer_kafka_opts.label:
"""Confluent Producer"""
ssl_client_opts.desc:
"""TLS/SSL options for Confluent client."""
ssl_client_opts.label:
"""TLS/SSL options"""
server_name_indication.desc:
"""Server Name Indication (SNI) setting for TLS handshake.<br/>
- <code>auto</code>: The client will use <code>"servicebus.windows.net"</code> as SNI.<br/>
- <code>disable</code>: If you wish to prevent the client from sending the SNI.<br/>
- Other string values it will be sent as-is."""
server_name_indication.label:
"""SNI"""
}