diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index 115266252..6cfcf7d5d 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -120,7 +120,7 @@ %% Allocatable resources -define(kafka_client_id, kafka_client_id). --define(kafka_subscriber_ids, kafka_subscriber_ids). +-define(kafka_subscriber_id, kafka_subscriber_id). %%------------------------------------------------------------------------------------- %% `emqx_resource' API @@ -180,19 +180,27 @@ on_start(ConnectorResId, Config) -> installed_sources => #{} }}. --spec on_stop(resource_id(), connector_state()) -> ok. +-spec on_stop(connector_resource_id(), connector_state()) -> ok. on_stop(ConnectorResId, _State = undefined) -> - case emqx_resource:get_allocated_resources(ConnectorResId) of - #{?kafka_client_id := ClientID, ?kafka_subscriber_ids := SubscriberIds} -> - lists:foreach(fun stop_subscriber/1, SubscriberIds), - stop_client(ClientID), + SubscribersStopped = + maps:fold( + fun + (?kafka_client_id, ClientID, Acc) -> + stop_client(ClientID), + Acc; + ({?kafka_subscriber_id, _SourceResId}, SubscriberId, Acc) -> + stop_subscriber(SubscriberId), + Acc + 1 + end, + 0, + emqx_resource:get_allocated_resources(ConnectorResId) + ), + case SubscribersStopped > 0 of + true -> ?tp(kafka_consumer_subcriber_and_client_stopped, #{}), ok; - #{?kafka_client_id := ClientID} -> - stop_client(ClientID), + false -> ?tp(kafka_consumer_just_client_stopped, #{}), - ok; - _ -> ok end; on_stop(ConnectorResId, State) -> @@ -253,7 +261,7 @@ on_remove_channel(ConnectorResId, ConnectorState0, SourceResId) -> {SourceState, InstalledSources} -> #{subscriber_id := SubscriberId} = SourceState, stop_subscriber(SubscriberId), - deallocate_subscriber_id(ConnectorResId, SubscriberId), + deallocate_subscriber_id(ConnectorResId, SourceResId), ok; error -> InstalledSources = InstalledSources0 @@ -404,6 +412,7 @@ start_consumer(Config, ConnectorResId, SourceResId, ClientID) -> } = Params0 } = Config, ok = ensure_consumer_supervisor_started(), + ?tp(kafka_consumer_sup_started, #{}), TopicMapping = ensure_topic_mapping(Params0), InitialState = #{ key_encoding_mode => KeyEncodingMode, @@ -450,7 +459,7 @@ start_consumer(Config, ConnectorResId, SourceResId, ClientID) -> %% automatically, so we should not spawn duplicate workers. SubscriberId = make_subscriber_id(BridgeName), ?tp(kafka_consumer_about_to_start_subscriber, #{}), - ok = allocate_subscriber_id(ConnectorResId, SubscriberId), + ok = allocate_subscriber_id(ConnectorResId, SourceResId, SubscriberId), ?tp(kafka_consumer_subscriber_allocated, #{}), case emqx_bridge_kafka_consumer_sup:start_child(SubscriberId, GroupSubscriberConfig) of {ok, _ConsumerPid} -> @@ -689,18 +698,15 @@ infer_client_error(Error) -> undefined end. -allocate_subscriber_id(ConnectorResId, SubscriberId) -> - AllocatedResources = emqx_resource:get_allocated_resources(ConnectorResId), - AllocatedSubscriberIds0 = maps:get(?kafka_subscriber_ids, AllocatedResources, []), - AllocatedSubscriberIds = lists:usort([SubscriberId | AllocatedSubscriberIds0]), +allocate_subscriber_id(ConnectorResId, SourceResId, SubscriberId) -> ok = emqx_resource:allocate_resource( - ConnectorResId, ?kafka_subscriber_ids, AllocatedSubscriberIds + ConnectorResId, + {?kafka_subscriber_id, SourceResId}, + SubscriberId ). -deallocate_subscriber_id(ConnectorResId, SubscriberId) -> - AllocatedResources = emqx_resource:get_allocated_resources(ConnectorResId), - AllocatedSubscriberIds0 = maps:get(?kafka_subscriber_ids, AllocatedResources, []), - AllocatedSubscriberIds = AllocatedSubscriberIds0 -- [SubscriberId], - ok = emqx_resource:allocate_resource( - ConnectorResId, ?kafka_subscriber_ids, AllocatedSubscriberIds +deallocate_subscriber_id(ConnectorResId, SourceResId) -> + ok = emqx_resource:deallocate_resource( + ConnectorResId, + {?kafka_subscriber_id, SourceResId} ). diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index b6827235e..fe7353f68 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -2158,7 +2158,7 @@ t_resource_manager_crash_before_subscriber_started(Config) -> ?check_trace( begin ?force_ordering( - #{?snk_kind := kafka_consumer_client_started}, + #{?snk_kind := kafka_consumer_sup_started}, #{?snk_kind := will_kill_resource_manager} ), ?force_ordering(