refactor: use individual keys for subscriber id resources

This commit is contained in:
Thales Macedo Garitezi 2024-02-29 10:05:16 -03:00
parent 0aa9a872a3
commit 16b3dc1166
2 changed files with 30 additions and 24 deletions

View File

@ -120,7 +120,7 @@
%% Allocatable resources %% Allocatable resources
-define(kafka_client_id, kafka_client_id). -define(kafka_client_id, kafka_client_id).
-define(kafka_subscriber_ids, kafka_subscriber_ids). -define(kafka_subscriber_id, kafka_subscriber_id).
%%------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------
%% `emqx_resource' API %% `emqx_resource' API
@ -180,19 +180,27 @@ on_start(ConnectorResId, Config) ->
installed_sources => #{} installed_sources => #{}
}}. }}.
-spec on_stop(resource_id(), connector_state()) -> ok. -spec on_stop(connector_resource_id(), connector_state()) -> ok.
on_stop(ConnectorResId, _State = undefined) -> on_stop(ConnectorResId, _State = undefined) ->
case emqx_resource:get_allocated_resources(ConnectorResId) of SubscribersStopped =
#{?kafka_client_id := ClientID, ?kafka_subscriber_ids := SubscriberIds} -> maps:fold(
lists:foreach(fun stop_subscriber/1, SubscriberIds), fun
stop_client(ClientID), (?kafka_client_id, ClientID, Acc) ->
stop_client(ClientID),
Acc;
({?kafka_subscriber_id, _SourceResId}, SubscriberId, Acc) ->
stop_subscriber(SubscriberId),
Acc + 1
end,
0,
emqx_resource:get_allocated_resources(ConnectorResId)
),
case SubscribersStopped > 0 of
true ->
?tp(kafka_consumer_subcriber_and_client_stopped, #{}), ?tp(kafka_consumer_subcriber_and_client_stopped, #{}),
ok; ok;
#{?kafka_client_id := ClientID} -> false ->
stop_client(ClientID),
?tp(kafka_consumer_just_client_stopped, #{}), ?tp(kafka_consumer_just_client_stopped, #{}),
ok;
_ ->
ok ok
end; end;
on_stop(ConnectorResId, State) -> on_stop(ConnectorResId, State) ->
@ -253,7 +261,7 @@ on_remove_channel(ConnectorResId, ConnectorState0, SourceResId) ->
{SourceState, InstalledSources} -> {SourceState, InstalledSources} ->
#{subscriber_id := SubscriberId} = SourceState, #{subscriber_id := SubscriberId} = SourceState,
stop_subscriber(SubscriberId), stop_subscriber(SubscriberId),
deallocate_subscriber_id(ConnectorResId, SubscriberId), deallocate_subscriber_id(ConnectorResId, SourceResId),
ok; ok;
error -> error ->
InstalledSources = InstalledSources0 InstalledSources = InstalledSources0
@ -404,6 +412,7 @@ start_consumer(Config, ConnectorResId, SourceResId, ClientID) ->
} = Params0 } = Params0
} = Config, } = Config,
ok = ensure_consumer_supervisor_started(), ok = ensure_consumer_supervisor_started(),
?tp(kafka_consumer_sup_started, #{}),
TopicMapping = ensure_topic_mapping(Params0), TopicMapping = ensure_topic_mapping(Params0),
InitialState = #{ InitialState = #{
key_encoding_mode => KeyEncodingMode, key_encoding_mode => KeyEncodingMode,
@ -450,7 +459,7 @@ start_consumer(Config, ConnectorResId, SourceResId, ClientID) ->
%% automatically, so we should not spawn duplicate workers. %% automatically, so we should not spawn duplicate workers.
SubscriberId = make_subscriber_id(BridgeName), SubscriberId = make_subscriber_id(BridgeName),
?tp(kafka_consumer_about_to_start_subscriber, #{}), ?tp(kafka_consumer_about_to_start_subscriber, #{}),
ok = allocate_subscriber_id(ConnectorResId, SubscriberId), ok = allocate_subscriber_id(ConnectorResId, SourceResId, SubscriberId),
?tp(kafka_consumer_subscriber_allocated, #{}), ?tp(kafka_consumer_subscriber_allocated, #{}),
case emqx_bridge_kafka_consumer_sup:start_child(SubscriberId, GroupSubscriberConfig) of case emqx_bridge_kafka_consumer_sup:start_child(SubscriberId, GroupSubscriberConfig) of
{ok, _ConsumerPid} -> {ok, _ConsumerPid} ->
@ -689,18 +698,15 @@ infer_client_error(Error) ->
undefined undefined
end. end.
allocate_subscriber_id(ConnectorResId, SubscriberId) -> allocate_subscriber_id(ConnectorResId, SourceResId, SubscriberId) ->
AllocatedResources = emqx_resource:get_allocated_resources(ConnectorResId),
AllocatedSubscriberIds0 = maps:get(?kafka_subscriber_ids, AllocatedResources, []),
AllocatedSubscriberIds = lists:usort([SubscriberId | AllocatedSubscriberIds0]),
ok = emqx_resource:allocate_resource( ok = emqx_resource:allocate_resource(
ConnectorResId, ?kafka_subscriber_ids, AllocatedSubscriberIds ConnectorResId,
{?kafka_subscriber_id, SourceResId},
SubscriberId
). ).
deallocate_subscriber_id(ConnectorResId, SubscriberId) -> deallocate_subscriber_id(ConnectorResId, SourceResId) ->
AllocatedResources = emqx_resource:get_allocated_resources(ConnectorResId), ok = emqx_resource:deallocate_resource(
AllocatedSubscriberIds0 = maps:get(?kafka_subscriber_ids, AllocatedResources, []), ConnectorResId,
AllocatedSubscriberIds = AllocatedSubscriberIds0 -- [SubscriberId], {?kafka_subscriber_id, SourceResId}
ok = emqx_resource:allocate_resource(
ConnectorResId, ?kafka_subscriber_ids, AllocatedSubscriberIds
). ).

View File

@ -2158,7 +2158,7 @@ t_resource_manager_crash_before_subscriber_started(Config) ->
?check_trace( ?check_trace(
begin begin
?force_ordering( ?force_ordering(
#{?snk_kind := kafka_consumer_client_started}, #{?snk_kind := kafka_consumer_sup_started},
#{?snk_kind := will_kill_resource_manager} #{?snk_kind := will_kill_resource_manager}
), ),
?force_ordering( ?force_ordering(