From 65c15b3faeb56268482c81e057490ef547f96357 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 1 Mar 2023 16:12:24 -0300 Subject: [PATCH] refactor(kafka_consumer): move subscriber startup logic to separate fn --- .../kafka/emqx_bridge_impl_kafka_consumer.erl | 144 ++++++++++-------- 1 file changed, 80 insertions(+), 64 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl index f0480f2d6..076d7fd97 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl @@ -82,37 +82,21 @@ is_buffer_supported() -> -spec on_start(manager_id(), config()) -> {ok, state()}. on_start(InstanceId, Config) -> - ensure_consumer_supervisor_started(), #{ authentication := Auth, bootstrap_hosts := BootstrapHosts0, bridge_name := BridgeName, - hookpoint := Hookpoint, + hookpoint := _, kafka := #{ - max_batch_bytes := MaxBatchBytes, - max_rejoin_attempts := MaxRejoinAttempts, - offset_reset_policy := OffsetResetPolicy, - topic := KafkaTopic + max_batch_bytes := _, + max_rejoin_attempts := _, + offset_reset_policy := _, + topic := _ }, - mqtt := #{topic := MQTTTopic, qos := MQTTQoS, payload := MQTTPayload}, + mqtt := #{topic := _, qos := _, payload := _}, ssl := SSL } = Config, BootstrapHosts = emqx_bridge_impl_kafka:hosts(BootstrapHosts0), - GroupConfig = [{max_rejoin_attempts, MaxRejoinAttempts}], - ConsumerConfig = [ - {max_bytes, MaxBatchBytes}, - {offset_reset_policy, OffsetResetPolicy} - ], - InitialState = #{ - resource_id => emqx_bridge_resource:resource_id(kafka_consumer, BridgeName), - mqtt => #{ - payload => MQTTPayload, - topic => MQTTTopic, - qos => MQTTQoS - }, - hookpoint => Hookpoint, - kafka_topic => KafkaTopic - }, KafkaType = kafka_consumer, %% Note: this is distinct per node. ClientID0 = emqx_bridge_impl_kafka:make_client_id(KafkaType, BridgeName), @@ -143,48 +127,7 @@ on_start(InstanceId, Config) -> }), throw(failed_to_start_kafka_client) end, - %% note: the group id should be the same for all nodes in the - %% cluster, so that the load gets distributed between all - %% consumers and we don't repeat messages in the same cluster. - GroupID = consumer_group_id(BridgeName), - GroupSubscriberConfig = - #{ - client => ClientID, - group_id => GroupID, - topics => [KafkaTopic], - cb_module => ?MODULE, - init_data => InitialState, - message_type => message, - consumer_config => ConsumerConfig, - group_config => GroupConfig - }, - %% Below, we spawn a single `brod_group_consumer_v2' worker, with - %% no option for a pool of those. This is because that worker - %% spawns one worker for each assigned topic-partition - %% automatically, so we should not spawn duplicate workers. - SubscriberId = make_subscriber_id(BridgeName), - case emqx_ee_bridge_kafka_consumer_sup:start_child(SubscriberId, GroupSubscriberConfig) of - {ok, _ConsumerPid} -> - ?tp( - kafka_consumer_subscriber_started, - #{instance_id => InstanceId, subscriber_id => SubscriberId} - ), - {ok, #{ - subscriber_id => SubscriberId, - kafka_client_id => ClientID, - kafka_topic => KafkaTopic - }}; - {error, Reason2} -> - ?SLOG(error, #{ - msg => "failed_to_start_kafka_consumer", - instance_id => InstanceId, - kafka_hosts => BootstrapHosts, - kafka_topic => KafkaTopic, - reason => Reason2 - }), - stop_client(ClientID), - throw(failed_to_start_kafka_consumer) - end. + start_subscriber(Config, InstanceId, ClientID). -spec on_stop(manager_id(), state()) -> ok. on_stop(_InstanceID, State) -> @@ -296,6 +239,79 @@ ensure_consumer_supervisor_started() -> ok end. +-spec start_subscriber(config(), manager_id(), brod:client_id()) -> {ok, state()}. +start_subscriber(Config, InstanceId, ClientID) -> + #{ + bootstrap_hosts := BootstrapHosts0, + bridge_name := BridgeName, + hookpoint := Hookpoint, + kafka := #{ + max_batch_bytes := MaxBatchBytes, + max_rejoin_attempts := MaxRejoinAttempts, + offset_reset_policy := OffsetResetPolicy, + topic := KafkaTopic + }, + mqtt := #{topic := MQTTTopic, qos := MQTTQoS, payload := MQTTPayload} + } = Config, + ensure_consumer_supervisor_started(), + InitialState = #{ + resource_id => emqx_bridge_resource:resource_id(kafka_consumer, BridgeName), + mqtt => #{ + payload => MQTTPayload, + topic => MQTTTopic, + qos => MQTTQoS + }, + hookpoint => Hookpoint, + kafka_topic => KafkaTopic + }, + %% note: the group id should be the same for all nodes in the + %% cluster, so that the load gets distributed between all + %% consumers and we don't repeat messages in the same cluster. + GroupID = consumer_group_id(BridgeName), + ConsumerConfig = [ + {max_bytes, MaxBatchBytes}, + {offset_reset_policy, OffsetResetPolicy} + ], + GroupConfig = [{max_rejoin_attempts, MaxRejoinAttempts}], + GroupSubscriberConfig = + #{ + client => ClientID, + group_id => GroupID, + topics => [KafkaTopic], + cb_module => ?MODULE, + init_data => InitialState, + message_type => message, + consumer_config => ConsumerConfig, + group_config => GroupConfig + }, + %% Below, we spawn a single `brod_group_consumer_v2' worker, with + %% no option for a pool of those. This is because that worker + %% spawns one worker for each assigned topic-partition + %% automatically, so we should not spawn duplicate workers. + SubscriberId = make_subscriber_id(BridgeName), + case emqx_ee_bridge_kafka_consumer_sup:start_child(SubscriberId, GroupSubscriberConfig) of + {ok, _ConsumerPid} -> + ?tp( + kafka_consumer_subscriber_started, + #{instance_id => InstanceId, subscriber_id => SubscriberId} + ), + {ok, #{ + subscriber_id => SubscriberId, + kafka_client_id => ClientID, + kafka_topic => KafkaTopic + }}; + {error, Reason2} -> + ?SLOG(error, #{ + msg => "failed_to_start_kafka_consumer", + instance_id => InstanceId, + kafka_hosts => emqx_bridge_impl_kafka:hosts(BootstrapHosts0), + kafka_topic => KafkaTopic, + reason => Reason2 + }), + stop_client(ClientID), + throw(failed_to_start_kafka_consumer) + end. + -spec stop_subscriber(emqx_ee_bridge_kafka_consumer_sup:child_id()) -> ok. stop_subscriber(SubscriberId) -> _ = log_when_error(