Merge pull request #13518 from thalesmg/20240724-r57-dynamic-kprodu-action-mkIII

feat(kafka producer): allow dynamic topics (mkIII)
This commit is contained in:
JianBo He 2024-07-29 22:43:20 +08:00 committed by GitHub
commit c637422302
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 396 additions and 94 deletions

View File

@ -23,7 +23,7 @@ defmodule EMQXBridgeAzureEventHub.MixProject do
def deps() do def deps() do
[ [
{:wolff, github: "kafka4beam/wolff", tag: "2.0.0"}, {:wolff, github: "kafka4beam/wolff", tag: "3.0.2"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
{:brod, github: "kafka4beam/brod", tag: "3.18.0"}, {:brod, github: "kafka4beam/brod", tag: "3.18.0"},

View File

@ -2,7 +2,7 @@
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "2.0.0"}}}, {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "3.0.2"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},

View File

@ -382,12 +382,31 @@ t_multiple_actions_sharing_topic(Config) ->
ActionConfig0, ActionConfig0,
#{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
), ),
ok = emqx_bridge_v2_kafka_producer_SUITE:t_multiple_actions_sharing_topic( ok =
[ emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME(
{type, ?BRIDGE_TYPE_BIN}, [
{connector_name, ?config(connector_name, Config)}, {type, ?BRIDGE_TYPE_BIN},
{connector_config, ?config(connector_config, Config)}, {connector_name, ?config(connector_name, Config)},
{action_config, ActionConfig} {connector_config, ?config(connector_config, Config)},
] {action_config, ActionConfig}
), ]
),
ok.
t_dynamic_topics(Config) ->
ActionConfig0 = ?config(action_config, Config),
ActionConfig =
emqx_utils_maps:deep_merge(
ActionConfig0,
#{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
),
ok =
emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME(
[
{type, ?BRIDGE_TYPE_BIN},
{connector_name, ?config(connector_name, Config)},
{connector_config, ?config(connector_config, Config)},
{action_config, ActionConfig}
]
),
ok. ok.

View File

@ -23,7 +23,7 @@ defmodule EMQXBridgeConfluent.MixProject do
def deps() do def deps() do
[ [
{:wolff, github: "kafka4beam/wolff", tag: "2.0.0"}, {:wolff, github: "kafka4beam/wolff", tag: "3.0.2"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
{:brod, github: "kafka4beam/brod", tag: "3.18.0"}, {:brod, github: "kafka4beam/brod", tag: "3.18.0"},

View File

@ -2,7 +2,7 @@
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "2.0.0"}}}, {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "3.0.2"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},

View File

@ -391,12 +391,31 @@ t_multiple_actions_sharing_topic(Config) ->
ActionConfig0, ActionConfig0,
#{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}} #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
), ),
ok = emqx_bridge_v2_kafka_producer_SUITE:t_multiple_actions_sharing_topic( ok =
[ emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME(
{type, ?ACTION_TYPE_BIN}, [
{connector_name, ?config(connector_name, Config)}, {type, ?ACTION_TYPE_BIN},
{connector_config, ?config(connector_config, Config)}, {connector_name, ?config(connector_name, Config)},
{action_config, ActionConfig} {connector_config, ?config(connector_config, Config)},
] {action_config, ActionConfig}
), ]
),
ok.
t_dynamic_topics(Config) ->
ActionConfig0 = ?config(action_config, Config),
ActionConfig =
emqx_utils_maps:deep_merge(
ActionConfig0,
#{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
),
ok =
emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME(
[
{type, ?ACTION_TYPE_BIN},
{connector_name, ?config(connector_name, Config)},
{connector_config, ?config(connector_config, Config)},
{action_config, ActionConfig}
]
),
ok. ok.

View File

@ -23,7 +23,7 @@ defmodule EMQXBridgeKafka.MixProject do
def deps() do def deps() do
[ [
{:wolff, github: "kafka4beam/wolff", tag: "2.0.0"}, {:wolff, github: "kafka4beam/wolff", tag: "3.0.2"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
{:brod, github: "kafka4beam/brod", tag: "3.18.0"}, {:brod, github: "kafka4beam/brod", tag: "3.18.0"},

View File

@ -2,7 +2,7 @@
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "2.0.0"}}}, {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "3.0.2"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},

View File

@ -295,6 +295,7 @@ fields("config_producer") ->
fields("config_consumer") -> fields("config_consumer") ->
fields(kafka_consumer); fields(kafka_consumer);
fields(kafka_producer) -> fields(kafka_producer) ->
%% Schema used by bridges V1.
connector_config_fields() ++ producer_opts(v1); connector_config_fields() ++ producer_opts(v1);
fields(kafka_producer_action) -> fields(kafka_producer_action) ->
[ [
@ -364,9 +365,33 @@ fields(socket_opts) ->
validator => fun emqx_schema:validate_tcp_keepalive/1 validator => fun emqx_schema:validate_tcp_keepalive/1
})} })}
]; ];
fields(v1_producer_kafka_opts) ->
OldSchemaFields =
[
topic,
message,
max_batch_bytes,
compression,
partition_strategy,
required_acks,
kafka_headers,
kafka_ext_headers,
kafka_header_value_encode_mode,
partition_count_refresh_interval,
partitions_limit,
max_inflight,
buffer,
query_mode,
sync_query_timeout
],
Fields = fields(producer_kafka_opts),
lists:filter(
fun({K, _V}) -> lists:member(K, OldSchemaFields) end,
Fields
);
fields(producer_kafka_opts) -> fields(producer_kafka_opts) ->
[ [
{topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})}, {topic, mk(emqx_schema:template(), #{required => true, desc => ?DESC(kafka_topic)})},
{message, mk(ref(kafka_message), #{required => false, desc => ?DESC(kafka_message)})}, {message, mk(ref(kafka_message), #{required => false, desc => ?DESC(kafka_message)})},
{max_batch_bytes, {max_batch_bytes,
mk(emqx_schema:bytesize(), #{default => <<"896KB">>, desc => ?DESC(max_batch_bytes)})}, mk(emqx_schema:bytesize(), #{default => <<"896KB">>, desc => ?DESC(max_batch_bytes)})},
@ -675,15 +700,15 @@ resource_opts() ->
%% However we need to keep it backward compatible for generated schema json (version 0.1.0) %% However we need to keep it backward compatible for generated schema json (version 0.1.0)
%% since schema is data for the 'schemas' API. %% since schema is data for the 'schemas' API.
parameters_field(ActionOrBridgeV1) -> parameters_field(ActionOrBridgeV1) ->
{Name, Alias} = {Name, Alias, Ref} =
case ActionOrBridgeV1 of case ActionOrBridgeV1 of
v1 -> v1 ->
{kafka, parameters}; {kafka, parameters, v1_producer_kafka_opts};
action -> action ->
{parameters, kafka} {parameters, kafka, producer_kafka_opts}
end, end,
{Name, {Name,
mk(ref(producer_kafka_opts), #{ mk(ref(Ref), #{
required => true, required => true,
aliases => [Alias], aliases => [Alias],
desc => ?DESC(producer_kafka_opts), desc => ?DESC(producer_kafka_opts),

View File

@ -3,6 +3,8 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_bridge_kafka_impl_producer). -module(emqx_bridge_kafka_impl_producer).
-feature(maybe_expr, enable).
-behaviour(emqx_resource). -behaviour(emqx_resource).
-include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl").
@ -122,8 +124,8 @@ on_add_channel(
{ok, NewState}. {ok, NewState}.
create_producers_for_bridge_v2( create_producers_for_bridge_v2(
InstId, ConnResId,
BridgeV2Id, ActionResId,
ClientId, ClientId,
#{ #{
bridge_type := BridgeType, bridge_type := BridgeType,
@ -132,33 +134,42 @@ create_producers_for_bridge_v2(
) -> ) ->
#{ #{
message := MessageTemplate, message := MessageTemplate,
topic := KafkaTopic, topic := KafkaTopic0,
sync_query_timeout := SyncQueryTimeout sync_query_timeout := SyncQueryTimeout
} = KafkaConfig, } = KafkaConfig,
TopicTemplate = {TopicType, TopicOrTemplate} = maybe_preproc_topic(KafkaTopic0),
MKafkaTopic =
case TopicType of
fixed -> TopicOrTemplate;
dynamic -> dynamic
end,
KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)), KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)),
KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])), KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])),
KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none), KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none),
MaxPartitions = maps:get(partitions_limit, KafkaConfig, all_partitions), MaxPartitions = maps:get(partitions_limit, KafkaConfig, all_partitions),
#{name := BridgeName} = emqx_bridge_v2:parse_id(BridgeV2Id), #{name := BridgeName} = emqx_bridge_v2:parse_id(ActionResId),
IsDryRun = emqx_resource:is_dry_run(BridgeV2Id), IsDryRun = emqx_resource:is_dry_run(ActionResId),
ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions), ok = check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions),
WolffProducerConfig = producers_config( WolffProducerConfig = producers_config(
BridgeType, BridgeName, KafkaConfig, IsDryRun, BridgeV2Id BridgeType, BridgeName, KafkaConfig, IsDryRun, ActionResId
), ),
case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of case wolff:ensure_supervised_dynamic_producers(ClientId, WolffProducerConfig) of
{ok, Producers} -> {ok, Producers} ->
ok = emqx_resource:allocate_resource(InstId, {?kafka_producers, BridgeV2Id}, Producers),
ok = emqx_resource:allocate_resource( ok = emqx_resource:allocate_resource(
InstId, {?kafka_telemetry_id, BridgeV2Id}, BridgeV2Id ConnResId, {?kafka_producers, ActionResId}, Producers
), ),
_ = maybe_install_wolff_telemetry_handlers(BridgeV2Id), ok = emqx_resource:allocate_resource(
ConnResId, {?kafka_telemetry_id, ActionResId}, ActionResId
),
_ = maybe_install_wolff_telemetry_handlers(ActionResId),
{ok, #{ {ok, #{
message_template => compile_message_template(MessageTemplate), message_template => compile_message_template(MessageTemplate),
kafka_client_id => ClientId, kafka_client_id => ClientId,
kafka_topic => KafkaTopic, topic_template => TopicTemplate,
topic => MKafkaTopic,
producers => Producers, producers => Producers,
resource_id => BridgeV2Id, resource_id => ActionResId,
connector_resource_id => InstId, connector_resource_id => ConnResId,
sync_query_timeout => SyncQueryTimeout, sync_query_timeout => SyncQueryTimeout,
kafka_config => KafkaConfig, kafka_config => KafkaConfig,
headers_tokens => KafkaHeadersTokens, headers_tokens => KafkaHeadersTokens,
@ -169,9 +180,9 @@ create_producers_for_bridge_v2(
{error, Reason2} -> {error, Reason2} ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "failed_to_start_kafka_producer", msg => "failed_to_start_kafka_producer",
instance_id => InstId, instance_id => ConnResId,
kafka_client_id => ClientId, kafka_client_id => ClientId,
kafka_topic => KafkaTopic, kafka_topic => MKafkaTopic,
reason => Reason2 reason => Reason2
}), }),
throw( throw(
@ -264,7 +275,9 @@ remove_producers_for_bridge_v2(
ClientId = maps:get(?kafka_client_id, AllocatedResources, no_client_id), ClientId = maps:get(?kafka_client_id, AllocatedResources, no_client_id),
maps:foreach( maps:foreach(
fun fun
({?kafka_producers, BridgeV2IdCheck}, Producers) when BridgeV2IdCheck =:= BridgeV2Id -> ({?kafka_producers, BridgeV2IdCheck}, Producers) when
BridgeV2IdCheck =:= BridgeV2Id
->
deallocate_producers(ClientId, Producers); deallocate_producers(ClientId, Producers);
({?kafka_telemetry_id, BridgeV2IdCheck}, TelemetryId) when ({?kafka_telemetry_id, BridgeV2IdCheck}, TelemetryId) when
BridgeV2IdCheck =:= BridgeV2Id BridgeV2IdCheck =:= BridgeV2Id
@ -297,7 +310,8 @@ on_query(
#{installed_bridge_v2s := BridgeV2Configs} = _ConnectorState #{installed_bridge_v2s := BridgeV2Configs} = _ConnectorState
) -> ) ->
#{ #{
message_template := Template, message_template := MessageTemplate,
topic_template := TopicTemplate,
producers := Producers, producers := Producers,
sync_query_timeout := SyncTimeout, sync_query_timeout := SyncTimeout,
headers_tokens := KafkaHeadersTokens, headers_tokens := KafkaHeadersTokens,
@ -310,7 +324,8 @@ on_query(
headers_val_encode_mode => KafkaHeadersValEncodeMode headers_val_encode_mode => KafkaHeadersValEncodeMode
}, },
try try
KafkaMessage = render_message(Template, KafkaHeaders, Message), KafkaTopic = render_topic(TopicTemplate, Message),
KafkaMessage = render_message(MessageTemplate, KafkaHeaders, Message),
?tp( ?tp(
emqx_bridge_kafka_impl_producer_sync_query, emqx_bridge_kafka_impl_producer_sync_query,
#{headers_config => KafkaHeaders, instance_id => InstId} #{headers_config => KafkaHeaders, instance_id => InstId}
@ -318,9 +333,15 @@ on_query(
emqx_trace:rendered_action_template(MessageTag, #{ emqx_trace:rendered_action_template(MessageTag, #{
message => KafkaMessage message => KafkaMessage
}), }),
do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) do_send_msg(sync, KafkaTopic, KafkaMessage, Producers, SyncTimeout)
catch catch
error:{invalid_partition_count, Count, _Partitioner} -> throw:bad_topic ->
?tp("kafka_producer_failed_to_render_topic", #{}),
{error, {unrecoverable_error, failed_to_render_topic}};
throw:#{cause := unknown_topic_or_partition, topic := Topic} ->
?tp("kafka_producer_resolved_to_unknown_topic", #{}),
{error, {unrecoverable_error, {resolved_to_unknown_topic, Topic}}};
throw:#{cause := invalid_partition_count, count := Count} ->
?tp("kafka_producer_invalid_partition_count", #{ ?tp("kafka_producer_invalid_partition_count", #{
action_id => MessageTag, action_id => MessageTag,
query_mode => sync query_mode => sync
@ -365,6 +386,7 @@ on_query_async(
) -> ) ->
#{ #{
message_template := Template, message_template := Template,
topic_template := TopicTemplate,
producers := Producers, producers := Producers,
headers_tokens := KafkaHeadersTokens, headers_tokens := KafkaHeadersTokens,
ext_headers_tokens := KafkaExtHeadersTokens, ext_headers_tokens := KafkaExtHeadersTokens,
@ -376,6 +398,7 @@ on_query_async(
headers_val_encode_mode => KafkaHeadersValEncodeMode headers_val_encode_mode => KafkaHeadersValEncodeMode
}, },
try try
KafkaTopic = render_topic(TopicTemplate, Message),
KafkaMessage = render_message(Template, KafkaHeaders, Message), KafkaMessage = render_message(Template, KafkaHeaders, Message),
?tp( ?tp(
emqx_bridge_kafka_impl_producer_async_query, emqx_bridge_kafka_impl_producer_async_query,
@ -384,9 +407,15 @@ on_query_async(
emqx_trace:rendered_action_template(MessageTag, #{ emqx_trace:rendered_action_template(MessageTag, #{
message => KafkaMessage message => KafkaMessage
}), }),
do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) do_send_msg(async, KafkaTopic, KafkaMessage, Producers, AsyncReplyFn)
catch catch
error:{invalid_partition_count, Count, _Partitioner} -> throw:bad_topic ->
?tp("kafka_producer_failed_to_render_topic", #{}),
{error, {unrecoverable_error, failed_to_render_topic}};
throw:#{cause := unknown_topic_or_partition, topic := Topic} ->
?tp("kafka_producer_resolved_to_unknown_topic", #{}),
{error, {unrecoverable_error, {resolved_to_unknown_topic, Topic}}};
throw:#{cause := invalid_partition_count, count := Count} ->
?tp("kafka_producer_invalid_partition_count", #{ ?tp("kafka_producer_invalid_partition_count", #{
action_id => MessageTag, action_id => MessageTag,
query_mode => async query_mode => async
@ -424,9 +453,28 @@ compile_message_template(T) ->
timestamp => preproc_tmpl(TimestampTemplate) timestamp => preproc_tmpl(TimestampTemplate)
}. }.
maybe_preproc_topic(Topic) ->
Template = emqx_template:parse(Topic),
case emqx_template:placeholders(Template) of
[] ->
{fixed, bin(Topic)};
[_ | _] ->
{dynamic, Template}
end.
preproc_tmpl(Tmpl) -> preproc_tmpl(Tmpl) ->
emqx_placeholder:preproc_tmpl(Tmpl). emqx_placeholder:preproc_tmpl(Tmpl).
render_topic({fixed, KafkaTopic}, _Message) ->
KafkaTopic;
render_topic({dynamic, Template}, Message) ->
try
iolist_to_binary(emqx_template:render_strict(Template, Message))
catch
error:_Errors ->
throw(bad_topic)
end.
render_message( render_message(
#{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate}, #{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate},
#{ #{
@ -468,9 +516,11 @@ render_timestamp(Template, Message) ->
erlang:system_time(millisecond) erlang:system_time(millisecond)
end. end.
do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) -> do_send_msg(sync, KafkaTopic, KafkaMessage, Producers, SyncTimeout) ->
try try
{_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], SyncTimeout), {_Partition, _Offset} = wolff:send_sync2(
Producers, KafkaTopic, [KafkaMessage], SyncTimeout
),
ok ok
catch catch
error:{producer_down, _} = Reason -> error:{producer_down, _} = Reason ->
@ -478,7 +528,7 @@ do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) ->
error:timeout -> error:timeout ->
{error, timeout} {error, timeout}
end; end;
do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) -> do_send_msg(async, KafkaTopic, KafkaMessage, Producers, AsyncReplyFn) ->
%% * Must be a batch because wolff:send and wolff:send_sync are batch APIs %% * Must be a batch because wolff:send and wolff:send_sync are batch APIs
%% * Must be a single element batch because wolff books calls, but not batch sizes %% * Must be a single element batch because wolff books calls, but not batch sizes
%% for counters and gauges. %% for counters and gauges.
@ -486,7 +536,9 @@ do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) ->
%% The retuned information is discarded here. %% The retuned information is discarded here.
%% If the producer process is down when sending, this function would %% If the producer process is down when sending, this function would
%% raise an error exception which is to be caught by the caller of this callback %% raise an error exception which is to be caught by the caller of this callback
{_Partition, Pid} = wolff:send(Producers, Batch, {fun ?MODULE:on_kafka_ack/3, [AsyncReplyFn]}), {_Partition, Pid} = wolff:send2(
Producers, KafkaTopic, Batch, {fun ?MODULE:on_kafka_ack/3, [AsyncReplyFn]}
),
%% this Pid is so far never used because Kafka producer is by-passing the buffer worker %% this Pid is so far never used because Kafka producer is by-passing the buffer worker
{ok, Pid}. {ok, Pid}.
@ -527,20 +579,23 @@ on_get_status(
end. end.
on_get_channel_status( on_get_channel_status(
_ResId, _ConnResId,
ChannelId, ActionResId,
#{ #{
client_id := ClientId, client_id := ClientId,
installed_bridge_v2s := Channels installed_bridge_v2s := Channels
} = _State } = _ConnState
) -> ) ->
%% Note: we must avoid returning `?status_disconnected' here. Returning %% Note: we must avoid returning `?status_disconnected' here. Returning
%% `?status_disconnected' will make resource manager try to restart the producers / %% `?status_disconnected' will make resource manager try to restart the producers /
%% connector, thus potentially dropping data held in wolff producer's replayq. The %% connector, thus potentially dropping data held in wolff producer's replayq. The
%% only exception is if the topic does not exist ("unhealthy target"). %% only exception is if the topic does not exist ("unhealthy target").
#{kafka_topic := KafkaTopic, partitions_limit := MaxPartitions} = maps:get(ChannelId, Channels), #{
topic := MKafkaTopic,
partitions_limit := MaxPartitions
} = maps:get(ActionResId, Channels),
try try
ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions), ok = check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions),
?status_connected ?status_connected
catch catch
throw:{unhealthy_target, Msg} -> throw:{unhealthy_target, Msg} ->
@ -549,22 +604,29 @@ on_get_channel_status(
{?status_connecting, {K, E}} {?status_connecting, {K, E}}
end. end.
check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions) -> check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions) ->
case wolff_client_sup:find_client(ClientId) of case wolff_client_sup:find_client(ClientId) of
{ok, Pid} -> {ok, Pid} ->
ok = check_topic_status(ClientId, Pid, KafkaTopic), maybe
ok = check_if_healthy_leaders(ClientId, Pid, KafkaTopic, MaxPartitions); true ?= is_binary(MKafkaTopic),
ok = check_topic_status(ClientId, Pid, MKafkaTopic),
ok = check_if_healthy_leaders(
ActionResId, ClientId, Pid, MKafkaTopic, MaxPartitions
)
else
false -> ok
end;
{error, #{reason := no_such_client}} -> {error, #{reason := no_such_client}} ->
throw(#{ throw(#{
reason => cannot_find_kafka_client, reason => cannot_find_kafka_client,
kafka_client => ClientId, kafka_client => ClientId,
kafka_topic => KafkaTopic kafka_topic => MKafkaTopic
}); });
{error, #{reason := client_supervisor_not_initialized}} -> {error, #{reason := client_supervisor_not_initialized}} ->
throw(#{ throw(#{
reason => restarting, reason => restarting,
kafka_client => ClientId, kafka_client => ClientId,
kafka_topic => KafkaTopic kafka_topic => MKafkaTopic
}) })
end. end.
@ -591,8 +653,10 @@ error_summary(Map, [Error]) ->
error_summary(Map, [Error | More]) -> error_summary(Map, [Error | More]) ->
Map#{first_error => Error, total_errors => length(More) + 1}. Map#{first_error => Error, total_errors => length(More) + 1}.
check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic, MaxPartitions) when is_pid(ClientPid) -> check_if_healthy_leaders(ActionResId, ClientId, ClientPid, KafkaTopic, MaxPartitions) when
case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of is_pid(ClientPid)
->
case wolff_client:get_leader_connections(ClientPid, ActionResId, KafkaTopic, MaxPartitions) of
{ok, Leaders} -> {ok, Leaders} ->
%% Kafka is considered healthy as long as any of the partition leader is reachable. %% Kafka is considered healthy as long as any of the partition leader is reachable.
case lists:partition(fun({_Partition, Pid}) -> is_alive(Pid) end, Leaders) of case lists:partition(fun({_Partition, Pid}) -> is_alive(Pid) end, Leaders) of
@ -654,7 +718,7 @@ ssl(#{enable := true} = SSL) ->
ssl(_) -> ssl(_) ->
false. false.
producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) -> producers_config(BridgeType, BridgeName, Input, IsDryRun, ActionResId) ->
#{ #{
max_batch_bytes := MaxBatchBytes, max_batch_bytes := MaxBatchBytes,
compression := Compression, compression := Compression,
@ -696,8 +760,8 @@ producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) ->
max_batch_bytes => MaxBatchBytes, max_batch_bytes => MaxBatchBytes,
max_send_ahead => MaxInflight - 1, max_send_ahead => MaxInflight - 1,
compression => Compression, compression => Compression,
alias => BridgeV2Id, group => ActionResId,
telemetry_meta_data => #{bridge_id => BridgeV2Id}, telemetry_meta_data => #{bridge_id => ActionResId},
max_partitions => MaxPartitions max_partitions => MaxPartitions
}. }.
@ -773,20 +837,19 @@ handle_telemetry_event(_EventId, _Metrics, _MetaData, _HandlerConfig) ->
%% Note: don't use the instance/manager ID, as that changes everytime %% Note: don't use the instance/manager ID, as that changes everytime
%% the bridge is recreated, and will lead to multiplication of %% the bridge is recreated, and will lead to multiplication of
%% metrics. %% metrics.
-spec telemetry_handler_id(resource_id()) -> binary(). -spec telemetry_handler_id(action_resource_id()) -> binary().
telemetry_handler_id(ResourceID) -> telemetry_handler_id(ActionResId) ->
<<"emqx-bridge-kafka-producer-", ResourceID/binary>>. ActionResId.
uninstall_telemetry_handlers(ResourceID) -> uninstall_telemetry_handlers(TelemetryId) ->
HandlerID = telemetry_handler_id(ResourceID), telemetry:detach(TelemetryId).
telemetry:detach(HandlerID).
maybe_install_wolff_telemetry_handlers(ResourceID) -> maybe_install_wolff_telemetry_handlers(TelemetryId) ->
%% Attach event handlers for Kafka telemetry events. If a handler with the %% Attach event handlers for Kafka telemetry events. If a handler with the
%% handler id already exists, the attach_many function does nothing %% handler id already exists, the attach_many function does nothing
telemetry:attach_many( telemetry:attach_many(
%% unique handler id %% unique handler id
telemetry_handler_id(ResourceID), telemetry_handler_id(TelemetryId),
[ [
[wolff, dropped_queue_full], [wolff, dropped_queue_full],
[wolff, queuing], [wolff, queuing],
@ -798,7 +861,7 @@ maybe_install_wolff_telemetry_handlers(ResourceID) ->
%% wolff producers; otherwise, multiple kafka producer bridges %% wolff producers; otherwise, multiple kafka producer bridges
%% will install multiple handlers to the same wolff events, %% will install multiple handlers to the same wolff events,
%% multiplying the metric counts... %% multiplying the metric counts...
#{bridge_id => ResourceID} #{bridge_id => TelemetryId}
). ).
preproc_kafka_headers(HeadersTmpl) when HeadersTmpl =:= <<>>; HeadersTmpl =:= undefined -> preproc_kafka_headers(HeadersTmpl) when HeadersTmpl =:= <<>>; HeadersTmpl =:= undefined ->

View File

@ -26,7 +26,12 @@ schema_module() -> emqx_bridge_kafka.
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
BridgeV1Config1 = maps:remove(<<"connector">>, ActionConfig), BridgeV1Config1 = maps:remove(<<"connector">>, ActionConfig),
BridgeV1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config1), BridgeV1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config1),
emqx_utils_maps:rename(<<"parameters">>, <<"kafka">>, BridgeV1Config2). BridgeV1Config = emqx_utils_maps:rename(<<"parameters">>, <<"kafka">>, BridgeV1Config2),
maps:update_with(
<<"kafka">>,
fun(Params) -> maps:with(v1_parameters(), Params) end,
BridgeV1Config
).
bridge_v1_config_to_action_config(BridgeV1Conf0 = #{<<"producer">> := _}, ConnectorName) -> bridge_v1_config_to_action_config(BridgeV1Conf0 = #{<<"producer">> := _}, ConnectorName) ->
%% Ancient v1 config, when `kafka' key was wrapped by `producer' %% Ancient v1 config, when `kafka' key was wrapped by `producer'
@ -51,6 +56,12 @@ bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
%% Internal helper functions %% Internal helper functions
%%------------------------------------------------------------------------------------------ %%------------------------------------------------------------------------------------------
v1_parameters() ->
[
to_bin(K)
|| {K, _} <- emqx_bridge_kafka:fields(v1_producer_kafka_opts)
].
producer_action_field_keys() -> producer_action_field_keys() ->
[ [
to_bin(K) to_bin(K)

View File

@ -477,7 +477,7 @@ do_start_producer(KafkaClientId, KafkaTopic) ->
ProducerConfig = ProducerConfig =
#{ #{
name => Name, name => Name,
partitioner => roundrobin, partitioner => random,
partition_count_refresh_interval_seconds => 1_000, partition_count_refresh_interval_seconds => 1_000,
replayq_max_total_bytes => 10_000, replayq_max_total_bytes => 10_000,
replayq_seg_bytes => 9_000, replayq_seg_bytes => 9_000,
@ -1520,7 +1520,7 @@ t_receive_after_recovery(Config) ->
key => <<"commit", (integer_to_binary(N))/binary>>, key => <<"commit", (integer_to_binary(N))/binary>>,
value => <<"commit", (integer_to_binary(N))/binary>> value => <<"commit", (integer_to_binary(N))/binary>>
} }
|| N <- lists:seq(1, NPartitions) || N <- lists:seq(1, NPartitions * 10)
], ],
%% we do distinct passes over this producing part so that %% we do distinct passes over this producing part so that
%% wolff won't batch everything together. %% wolff won't batch everything together.
@ -1933,7 +1933,7 @@ t_node_joins_existing_cluster(Config) ->
Val = <<"v", (integer_to_binary(N))/binary>>, Val = <<"v", (integer_to_binary(N))/binary>>,
publish(Config, KafkaTopic, [#{key => Key, value => Val}]) publish(Config, KafkaTopic, [#{key => Key, value => Val}])
end, end,
lists:seq(1, NPartitions) lists:seq(1, 10 * NPartitions)
), ),
{ok, _} = snabbkaffe:receive_events(SRef1), {ok, _} = snabbkaffe:receive_events(SRef1),

View File

@ -23,6 +23,7 @@
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("brod/include/brod.hrl"). -include_lib("brod/include/brod.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl").
-include_lib("emqx/include/asserts.hrl").
-import(emqx_common_test_helpers, [on_exit/1]). -import(emqx_common_test_helpers, [on_exit/1]).
@ -165,6 +166,9 @@ send_message(Type, ActionName) ->
resolve_kafka_offset() -> resolve_kafka_offset() ->
KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(), KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
resolve_kafka_offset(KafkaTopic).
resolve_kafka_offset(KafkaTopic) ->
Partition = 0, Partition = 0,
Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(), Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
{ok, Offset0} = emqx_bridge_kafka_impl_producer_SUITE:resolve_kafka_offset( {ok, Offset0} = emqx_bridge_kafka_impl_producer_SUITE:resolve_kafka_offset(
@ -174,11 +178,32 @@ resolve_kafka_offset() ->
check_kafka_message_payload(Offset, ExpectedPayload) -> check_kafka_message_payload(Offset, ExpectedPayload) ->
KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(), KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
check_kafka_message_payload(KafkaTopic, Offset, ExpectedPayload).
check_kafka_message_payload(KafkaTopic, Offset, ExpectedPayload) ->
Partition = 0, Partition = 0,
Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(), Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
{ok, {_, [KafkaMsg0]}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset), {ok, {_, [KafkaMsg0]}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset),
?assertMatch(#kafka_message{value = ExpectedPayload}, KafkaMsg0). ?assertMatch(#kafka_message{value = ExpectedPayload}, KafkaMsg0).
ensure_kafka_topic(KafkaTopic) ->
TopicConfigs = [
#{
name => KafkaTopic,
num_partitions => 1,
replication_factor => 1,
assignments => [],
configs => []
}
],
RequestConfig = #{timeout => 5_000},
ConnConfig = #{},
Endpoints = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
case brod:create_topics(Endpoints, TopicConfigs, RequestConfig, ConnConfig) of
ok -> ok;
{error, topic_already_exists} -> ok
end.
action_config(ConnectorName) -> action_config(ConnectorName) ->
action_config(ConnectorName, _Overrides = #{}). action_config(ConnectorName, _Overrides = #{}).
@ -728,9 +753,13 @@ t_invalid_partition_count_metrics(Config) ->
%% Simulate `invalid_partition_count' %% Simulate `invalid_partition_count'
emqx_common_test_helpers:with_mock( emqx_common_test_helpers:with_mock(
wolff, wolff,
send_sync, send_sync2,
fun(_Producers, _Msgs, _Timeout) -> fun(_Producers, _Topic, _Msgs, _Timeout) ->
error({invalid_partition_count, 0, partitioner}) throw(#{
cause => invalid_partition_count,
count => 0,
partitioner => partitioner
})
end, end,
fun() -> fun() ->
{{ok, _}, {ok, _}} = {{ok, _}, {ok, _}} =
@ -773,9 +802,13 @@ t_invalid_partition_count_metrics(Config) ->
%% Simulate `invalid_partition_count' %% Simulate `invalid_partition_count'
emqx_common_test_helpers:with_mock( emqx_common_test_helpers:with_mock(
wolff, wolff,
send, send2,
fun(_Producers, _Msgs, _Timeout) -> fun(_Producers, _Topic, _Msgs, _AckCallback) ->
error({invalid_partition_count, 0, partitioner}) throw(#{
cause => invalid_partition_count,
count => 0,
partitioner => partitioner
})
end, end,
fun() -> fun() ->
{{ok, _}, {ok, _}} = {{ok, _}, {ok, _}} =
@ -881,3 +914,131 @@ t_multiple_actions_sharing_topic(Config) ->
end end
), ),
ok. ok.
%% Smoke tests for using a templated topic and adynamic kafka topics.
t_dynamic_topics(Config) ->
Type = proplists:get_value(type, Config, ?TYPE),
ConnectorName = proplists:get_value(connector_name, Config, <<"c">>),
ConnectorConfig = proplists:get_value(connector_config, Config, connector_config()),
ActionName = <<"dynamic_topics">>,
ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)),
PreConfiguredTopic1 = <<"pct1">>,
PreConfiguredTopic2 = <<"pct2">>,
ensure_kafka_topic(PreConfiguredTopic1),
ensure_kafka_topic(PreConfiguredTopic2),
ActionConfig = emqx_bridge_v2_testlib:parse_and_check(
action,
Type,
ActionName,
emqx_utils_maps:deep_merge(
ActionConfig1,
#{
<<"parameters">> => #{
<<"topic">> => <<"pct${.payload.n}">>,
<<"message">> => #{
<<"key">> => <<"${.clientid}">>,
<<"value">> => <<"${.payload.p}">>
}
}
}
)
),
?check_trace(
#{timetrap => 7_000},
begin
ConnectorParams = [
{connector_config, ConnectorConfig},
{connector_name, ConnectorName},
{connector_type, Type}
],
ActionParams = [
{action_config, ActionConfig},
{action_name, ActionName},
{action_type, Type}
],
{ok, {{_, 201, _}, _, #{}}} =
emqx_bridge_v2_testlib:create_connector_api(ConnectorParams),
{ok, {{_, 201, _}, _, #{}}} =
emqx_bridge_v2_testlib:create_action_api(ActionParams),
RuleTopic = <<"pct">>,
{ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http(
Type,
RuleTopic,
[
{bridge_name, ActionName}
],
#{
sql =>
<<"select *, json_decode(payload) as payload from \"", RuleTopic/binary,
"\" ">>
}
),
?assertStatusAPI(Type, ActionName, <<"connected">>),
HandlerId = ?FUNCTION_NAME,
TestPid = self(),
telemetry:attach_many(
HandlerId,
emqx_resource_metrics:events(),
fun(EventName, Measurements, Metadata, _Config) ->
Data = #{
name => EventName,
measurements => Measurements,
metadata => Metadata
},
TestPid ! {telemetry, Data},
ok
end,
unused_config
),
on_exit(fun() -> telemetry:detach(HandlerId) end),
{ok, C} = emqtt:start_link(#{}),
{ok, _} = emqtt:connect(C),
Payload = fun(Map) -> emqx_utils_json:encode(Map) end,
Offset1 = resolve_kafka_offset(PreConfiguredTopic1),
Offset2 = resolve_kafka_offset(PreConfiguredTopic2),
{ok, _} = emqtt:publish(C, RuleTopic, Payload(#{n => 1, p => <<"p1">>}), [{qos, 1}]),
{ok, _} = emqtt:publish(C, RuleTopic, Payload(#{n => 2, p => <<"p2">>}), [{qos, 1}]),
check_kafka_message_payload(PreConfiguredTopic1, Offset1, <<"p1">>),
check_kafka_message_payload(PreConfiguredTopic2, Offset2, <<"p2">>),
ActionId = emqx_bridge_v2:id(Type, ActionName),
?assertEqual(2, emqx_resource_metrics:matched_get(ActionId)),
?assertEqual(2, emqx_resource_metrics:success_get(ActionId)),
?assertEqual(0, emqx_resource_metrics:queuing_get(ActionId)),
?assertReceive(
{telemetry, #{
measurements := #{gauge_set := _},
metadata := #{worker_id := _, resource_id := ActionId}
}}
),
%% If there isn't enough information in the context to resolve to a topic, it
%% should be an unrecoverable error.
?assertMatch(
{_, {ok, _}},
?wait_async_action(
emqtt:publish(C, RuleTopic, Payload(#{not_enough => <<"info">>}), [{qos, 1}]),
#{?snk_kind := "kafka_producer_failed_to_render_topic"}
)
),
%% If it's possible to render the topic, but it isn't in the pre-configured
%% list, it should be an unrecoverable error.
?assertMatch(
{_, {ok, _}},
?wait_async_action(
emqtt:publish(C, RuleTopic, Payload(#{n => 99}), [{qos, 1}]),
#{?snk_kind := "kafka_producer_resolved_to_unknown_topic"}
)
),
ok
end,
[]
),
ok.

View File

@ -0,0 +1,5 @@
Kafka producer action's `topic` config now supports templates.
The topics must be already created in Kafka. If a message is rendered towards a non-existing topic in Kafka (given Kafka disabled topic auto-creation), the message will fail with an unrecoverable error. Also, if a message does not contain enough information to render to the configured template (e.g.: the template is `t-${t}` and the message context does not define `t`), this message will also fail with an unrecoverable error.
This same feature is also available for Azure Event Hubs and Confluent Platform producer integrations.

View File

@ -361,7 +361,7 @@ defmodule EMQXUmbrella.MixProject do
{:hstreamdb_erl, {:hstreamdb_erl,
github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1+ezstd-v1.0.5-emqx1"}, github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1+ezstd-v1.0.5-emqx1"},
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true},
{:wolff, github: "kafka4beam/wolff", tag: "2.0.0"}, {:wolff, github: "kafka4beam/wolff", tag: "3.0.2"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
{:brod, github: "kafka4beam/brod", tag: "3.18.0"}, {:brod, github: "kafka4beam/brod", tag: "3.18.0"},

View File

@ -69,7 +69,7 @@ producer_kafka_opts.label:
"""Azure Event Hubs Producer""" """Azure Event Hubs Producer"""
kafka_topic.desc: kafka_topic.desc:
"""Event Hubs name""" """Event Hubs name. Supports templates (e.g.: `t-${payload.t}`)."""
kafka_topic.label: kafka_topic.label:
"""Event Hubs Name""" """Event Hubs Name"""

View File

@ -69,10 +69,10 @@ producer_kafka_opts.label:
"""Confluent Producer""" """Confluent Producer"""
kafka_topic.desc: kafka_topic.desc:
"""Event Hub name""" """Kafka topic name. Supports templates (e.g.: `t-${payload.t}`)."""
kafka_topic.label: kafka_topic.label:
"""Event Hub Name""" """Kafka Topic Name"""
kafka_message_timestamp.desc: kafka_message_timestamp.desc:
"""Which timestamp to use. The timestamp is expected to be a millisecond precision Unix epoch which can be in string format, e.g. <code>1661326462115</code> or <code>'1661326462115'</code>. When the desired data field for this template is not found, or if the found data is not a valid integer, the current system timestamp will be used.""" """Which timestamp to use. The timestamp is expected to be a millisecond precision Unix epoch which can be in string format, e.g. <code>1661326462115</code> or <code>'1661326462115'</code>. When the desired data field for this template is not found, or if the found data is not a valid integer, the current system timestamp will be used."""

View File

@ -81,7 +81,7 @@ producer_kafka_opts.label:
"""Kafka Producer""" """Kafka Producer"""
kafka_topic.desc: kafka_topic.desc:
"""Kafka topic name""" """Kafka topic name. Supports templates (e.g.: `t-${payload.t}`)."""
kafka_topic.label: kafka_topic.label:
"""Kafka Topic Name""" """Kafka Topic Name"""
@ -446,5 +446,4 @@ server_name_indication.desc:
server_name_indication.label: server_name_indication.label:
"""SNI""" """SNI"""
} }