From f4818717926765ad5f2846d7ee17b3f6daa16846 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 15 Apr 2024 20:52:08 +0200 Subject: [PATCH] fix(rocketmq action): we need one producer group per channel and topic We need one producer group per channel and topic because we can have several topics per channel due to templating. --- .../src/emqx_bridge_rocketmq_connector.erl | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl index 6a7b8d4bc..011d4074f 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl @@ -202,7 +202,7 @@ on_stop(InstanceId, _State) -> ({_, client_id, ClientId}) -> destory_producers_map(ClientId), ok = rocketmq:stop_and_delete_supervised_client(ClientId); - ({_, _ChannelId, Producer}) -> + ({_, _ProducerGroup, Producer}) -> _ = rocketmq:stop_and_delete_supervised_producers(Producer) end, emqx_resource:get_allocated_resources_list(InstanceId) @@ -394,18 +394,20 @@ destory_producers_map(ClientId) -> end. get_producers(ChannelId, InstanceId, ClientId, Topic, ProducerOpts) -> - case ets:lookup(ClientId, ChannelId) of + %% The topic need to be included in the name since we can have multiple + %% topics per channel due to templating. + ProducerGroup = iolist_to_binary([ChannelId, "_", Topic]), + case ets:lookup(ClientId, ProducerGroup) of [{_, Producers}] -> Producers; _ -> - ProducerGroup = ChannelId, %% TODO: the name needs to be an atom but this may cause atom leak so we %% should figure out a way to avoid this - ProducerOpts2 = ProducerOpts#{name => binary_to_atom(ChannelId)}, + ProducerOpts2 = ProducerOpts#{name => binary_to_atom(ProducerGroup)}, {ok, Producers} = rocketmq:ensure_supervised_producers( ClientId, ProducerGroup, Topic, ProducerOpts2 ), - ok = emqx_resource:allocate_resource(InstanceId, ChannelId, Producers), - ets:insert(ClientId, {ChannelId, Producers}), + ok = emqx_resource:allocate_resource(InstanceId, ProducerGroup, Producers), + ets:insert(ClientId, {ProducerGroup, Producers}), Producers end.