fix(rocketmq action): we need one producer group per channel and topic

We need one producer group per channel and topic because we can have
several topics per channel due to templating.
This commit is contained in:
Kjell Winblad 2024-04-15 20:52:08 +02:00
parent 1fe92bddd0
commit f481871792
1 changed files with 8 additions and 6 deletions

View File

@ -202,7 +202,7 @@ on_stop(InstanceId, _State) ->
({_, client_id, ClientId}) -> ({_, client_id, ClientId}) ->
destory_producers_map(ClientId), destory_producers_map(ClientId),
ok = rocketmq:stop_and_delete_supervised_client(ClientId); ok = rocketmq:stop_and_delete_supervised_client(ClientId);
({_, _ChannelId, Producer}) -> ({_, _ProducerGroup, Producer}) ->
_ = rocketmq:stop_and_delete_supervised_producers(Producer) _ = rocketmq:stop_and_delete_supervised_producers(Producer)
end, end,
emqx_resource:get_allocated_resources_list(InstanceId) emqx_resource:get_allocated_resources_list(InstanceId)
@ -394,18 +394,20 @@ destory_producers_map(ClientId) ->
end. end.
get_producers(ChannelId, InstanceId, ClientId, Topic, ProducerOpts) -> get_producers(ChannelId, InstanceId, ClientId, Topic, ProducerOpts) ->
case ets:lookup(ClientId, ChannelId) of %% The topic need to be included in the name since we can have multiple
%% topics per channel due to templating.
ProducerGroup = iolist_to_binary([ChannelId, "_", Topic]),
case ets:lookup(ClientId, ProducerGroup) of
[{_, Producers}] -> [{_, Producers}] ->
Producers; Producers;
_ -> _ ->
ProducerGroup = ChannelId,
%% TODO: the name needs to be an atom but this may cause atom leak so we %% TODO: the name needs to be an atom but this may cause atom leak so we
%% should figure out a way to avoid this %% should figure out a way to avoid this
ProducerOpts2 = ProducerOpts#{name => binary_to_atom(ChannelId)}, ProducerOpts2 = ProducerOpts#{name => binary_to_atom(ProducerGroup)},
{ok, Producers} = rocketmq:ensure_supervised_producers( {ok, Producers} = rocketmq:ensure_supervised_producers(
ClientId, ProducerGroup, Topic, ProducerOpts2 ClientId, ProducerGroup, Topic, ProducerOpts2
), ),
ok = emqx_resource:allocate_resource(InstanceId, ChannelId, Producers), ok = emqx_resource:allocate_resource(InstanceId, ProducerGroup, Producers),
ets:insert(ClientId, {ChannelId, Producers}), ets:insert(ClientId, {ProducerGroup, Producers}),
Producers Producers
end. end.