diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl index 89cad3421..f812901be 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl @@ -274,7 +274,9 @@ fields(consumer_kafka_opts) -> })}, {max_rejoin_attempts, mk(non_neg_integer(), #{ - default => 5, desc => ?DESC(consumer_max_rejoin_attempts) + hidden => true, + default => 5, + desc => ?DESC(consumer_max_rejoin_attempts) })}, {offset_reset_policy, mk( diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl index ab4a996bd..f89b63d7b 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl @@ -129,7 +129,7 @@ on_start(InstanceId, Config) -> }), throw(failed_to_start_kafka_client) end, - start_subscriber(Config, InstanceId, ClientID). + start_consumer(Config, InstanceId, ClientID). -spec on_stop(manager_id(), state()) -> ok. on_stop(_InstanceID, State) -> @@ -169,44 +169,45 @@ handle_message(Message, State) -> ?tp_span( kafka_consumer_handle_message, #{message => Message, state => State}, - begin - #{ - resource_id := ResourceId, - hookpoint := Hookpoint, - kafka_topic := KafkaTopic, - mqtt := #{ - topic := MQTTTopic, - payload := MQTTPayload, - qos := MQTTQoS - } - } = State, - FullMessage = #{ - offset => Message#kafka_message.offset, - key => Message#kafka_message.key, - value => Message#kafka_message.value, - ts => Message#kafka_message.ts, - ts_type => Message#kafka_message.ts_type, - headers => maps:from_list(Message#kafka_message.headers), - topic => KafkaTopic - }, - Payload = - case MQTTPayload of - full_message -> - FullMessage; - message_value -> - Message#kafka_message.value - end, - EncodedPayload = emqx_json:encode(Payload), - MQTTMessage = emqx_message:make(ResourceId, MQTTQoS, MQTTTopic, EncodedPayload), - _ = emqx:publish(MQTTMessage), - emqx:run_hook(Hookpoint, [FullMessage]), - emqx_resource_metrics:received_inc(ResourceId), - %% note: just `ack' does not commit the offset to the - %% kafka consumer group. - {ok, commit, State} - end + do_handle_message(Message, State) ). +do_handle_message(Message, State) -> + #{ + resource_id := ResourceId, + hookpoint := Hookpoint, + kafka_topic := KafkaTopic, + mqtt := #{ + topic := MQTTTopic, + payload := MQTTPayload, + qos := MQTTQoS + } + } = State, + FullMessage = #{ + offset => Message#kafka_message.offset, + key => Message#kafka_message.key, + value => Message#kafka_message.value, + ts => Message#kafka_message.ts, + ts_type => Message#kafka_message.ts_type, + headers => maps:from_list(Message#kafka_message.headers), + topic => KafkaTopic + }, + Payload = + case MQTTPayload of + full_message -> + FullMessage; + message_value -> + Message#kafka_message.value + end, + EncodedPayload = emqx_json:encode(Payload), + MQTTMessage = emqx_message:make(ResourceId, MQTTQoS, MQTTTopic, EncodedPayload), + _ = emqx:publish(MQTTMessage), + emqx:run_hook(Hookpoint, [FullMessage]), + emqx_resource_metrics:received_inc(ResourceId), + %% note: just `ack' does not commit the offset to the + %% kafka consumer group. + {ok, commit, State}. + %%------------------------------------------------------------------------------------- %% Helper fns %%------------------------------------------------------------------------------------- @@ -241,8 +242,8 @@ ensure_consumer_supervisor_started() -> ok end. --spec start_subscriber(config(), manager_id(), brod:client_id()) -> {ok, state()}. -start_subscriber(Config, InstanceId, ClientID) -> +-spec start_consumer(config(), manager_id(), brod:client_id()) -> {ok, state()}. +start_consumer(Config, InstanceId, ClientID) -> #{ bootstrap_hosts := BootstrapHosts0, bridge_name := BridgeName, @@ -256,7 +257,7 @@ start_subscriber(Config, InstanceId, ClientID) -> }, mqtt := #{topic := MQTTTopic, qos := MQTTQoS, payload := MQTTPayload} } = Config, - ensure_consumer_supervisor_started(), + ok = ensure_consumer_supervisor_started(), InitialState = #{ resource_id => emqx_bridge_resource:resource_id(kafka_consumer, BridgeName), mqtt => #{