refactor(kafka_consumer): move subscriber startup logic to separate fn

This commit is contained in:
Thales Macedo Garitezi 2023-03-01 16:12:24 -03:00
parent 969fa03ecc
commit 65c15b3fae
1 changed files with 80 additions and 64 deletions

View File

@ -82,37 +82,21 @@ is_buffer_supported() ->
-spec on_start(manager_id(), config()) -> {ok, state()}. -spec on_start(manager_id(), config()) -> {ok, state()}.
on_start(InstanceId, Config) -> on_start(InstanceId, Config) ->
ensure_consumer_supervisor_started(),
#{ #{
authentication := Auth, authentication := Auth,
bootstrap_hosts := BootstrapHosts0, bootstrap_hosts := BootstrapHosts0,
bridge_name := BridgeName, bridge_name := BridgeName,
hookpoint := Hookpoint, hookpoint := _,
kafka := #{ kafka := #{
max_batch_bytes := MaxBatchBytes, max_batch_bytes := _,
max_rejoin_attempts := MaxRejoinAttempts, max_rejoin_attempts := _,
offset_reset_policy := OffsetResetPolicy, offset_reset_policy := _,
topic := KafkaTopic topic := _
}, },
mqtt := #{topic := MQTTTopic, qos := MQTTQoS, payload := MQTTPayload}, mqtt := #{topic := _, qos := _, payload := _},
ssl := SSL ssl := SSL
} = Config, } = Config,
BootstrapHosts = emqx_bridge_impl_kafka:hosts(BootstrapHosts0), BootstrapHosts = emqx_bridge_impl_kafka:hosts(BootstrapHosts0),
GroupConfig = [{max_rejoin_attempts, MaxRejoinAttempts}],
ConsumerConfig = [
{max_bytes, MaxBatchBytes},
{offset_reset_policy, OffsetResetPolicy}
],
InitialState = #{
resource_id => emqx_bridge_resource:resource_id(kafka_consumer, BridgeName),
mqtt => #{
payload => MQTTPayload,
topic => MQTTTopic,
qos => MQTTQoS
},
hookpoint => Hookpoint,
kafka_topic => KafkaTopic
},
KafkaType = kafka_consumer, KafkaType = kafka_consumer,
%% Note: this is distinct per node. %% Note: this is distinct per node.
ClientID0 = emqx_bridge_impl_kafka:make_client_id(KafkaType, BridgeName), ClientID0 = emqx_bridge_impl_kafka:make_client_id(KafkaType, BridgeName),
@ -143,48 +127,7 @@ on_start(InstanceId, Config) ->
}), }),
throw(failed_to_start_kafka_client) throw(failed_to_start_kafka_client)
end, end,
%% note: the group id should be the same for all nodes in the start_subscriber(Config, InstanceId, ClientID).
%% cluster, so that the load gets distributed between all
%% consumers and we don't repeat messages in the same cluster.
GroupID = consumer_group_id(BridgeName),
GroupSubscriberConfig =
#{
client => ClientID,
group_id => GroupID,
topics => [KafkaTopic],
cb_module => ?MODULE,
init_data => InitialState,
message_type => message,
consumer_config => ConsumerConfig,
group_config => GroupConfig
},
%% Below, we spawn a single `brod_group_consumer_v2' worker, with
%% no option for a pool of those. This is because that worker
%% spawns one worker for each assigned topic-partition
%% automatically, so we should not spawn duplicate workers.
SubscriberId = make_subscriber_id(BridgeName),
case emqx_ee_bridge_kafka_consumer_sup:start_child(SubscriberId, GroupSubscriberConfig) of
{ok, _ConsumerPid} ->
?tp(
kafka_consumer_subscriber_started,
#{instance_id => InstanceId, subscriber_id => SubscriberId}
),
{ok, #{
subscriber_id => SubscriberId,
kafka_client_id => ClientID,
kafka_topic => KafkaTopic
}};
{error, Reason2} ->
?SLOG(error, #{
msg => "failed_to_start_kafka_consumer",
instance_id => InstanceId,
kafka_hosts => BootstrapHosts,
kafka_topic => KafkaTopic,
reason => Reason2
}),
stop_client(ClientID),
throw(failed_to_start_kafka_consumer)
end.
-spec on_stop(manager_id(), state()) -> ok. -spec on_stop(manager_id(), state()) -> ok.
on_stop(_InstanceID, State) -> on_stop(_InstanceID, State) ->
@ -296,6 +239,79 @@ ensure_consumer_supervisor_started() ->
ok ok
end. end.
-spec start_subscriber(config(), manager_id(), brod:client_id()) -> {ok, state()}.
start_subscriber(Config, InstanceId, ClientID) ->
#{
bootstrap_hosts := BootstrapHosts0,
bridge_name := BridgeName,
hookpoint := Hookpoint,
kafka := #{
max_batch_bytes := MaxBatchBytes,
max_rejoin_attempts := MaxRejoinAttempts,
offset_reset_policy := OffsetResetPolicy,
topic := KafkaTopic
},
mqtt := #{topic := MQTTTopic, qos := MQTTQoS, payload := MQTTPayload}
} = Config,
ensure_consumer_supervisor_started(),
InitialState = #{
resource_id => emqx_bridge_resource:resource_id(kafka_consumer, BridgeName),
mqtt => #{
payload => MQTTPayload,
topic => MQTTTopic,
qos => MQTTQoS
},
hookpoint => Hookpoint,
kafka_topic => KafkaTopic
},
%% note: the group id should be the same for all nodes in the
%% cluster, so that the load gets distributed between all
%% consumers and we don't repeat messages in the same cluster.
GroupID = consumer_group_id(BridgeName),
ConsumerConfig = [
{max_bytes, MaxBatchBytes},
{offset_reset_policy, OffsetResetPolicy}
],
GroupConfig = [{max_rejoin_attempts, MaxRejoinAttempts}],
GroupSubscriberConfig =
#{
client => ClientID,
group_id => GroupID,
topics => [KafkaTopic],
cb_module => ?MODULE,
init_data => InitialState,
message_type => message,
consumer_config => ConsumerConfig,
group_config => GroupConfig
},
%% Below, we spawn a single `brod_group_consumer_v2' worker, with
%% no option for a pool of those. This is because that worker
%% spawns one worker for each assigned topic-partition
%% automatically, so we should not spawn duplicate workers.
SubscriberId = make_subscriber_id(BridgeName),
case emqx_ee_bridge_kafka_consumer_sup:start_child(SubscriberId, GroupSubscriberConfig) of
{ok, _ConsumerPid} ->
?tp(
kafka_consumer_subscriber_started,
#{instance_id => InstanceId, subscriber_id => SubscriberId}
),
{ok, #{
subscriber_id => SubscriberId,
kafka_client_id => ClientID,
kafka_topic => KafkaTopic
}};
{error, Reason2} ->
?SLOG(error, #{
msg => "failed_to_start_kafka_consumer",
instance_id => InstanceId,
kafka_hosts => emqx_bridge_impl_kafka:hosts(BootstrapHosts0),
kafka_topic => KafkaTopic,
reason => Reason2
}),
stop_client(ClientID),
throw(failed_to_start_kafka_consumer)
end.
-spec stop_subscriber(emqx_ee_bridge_kafka_consumer_sup:child_id()) -> ok. -spec stop_subscriber(emqx_ee_bridge_kafka_consumer_sup:child_id()) -> ok.
stop_subscriber(SubscriberId) -> stop_subscriber(SubscriberId) ->
_ = log_when_error( _ = log_when_error(