diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl index 6a7b8d4bc..011d4074f 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl @@ -202,7 +202,7 @@ on_stop(InstanceId, _State) -> ({_, client_id, ClientId}) -> destory_producers_map(ClientId), ok = rocketmq:stop_and_delete_supervised_client(ClientId); - ({_, _ChannelId, Producer}) -> + ({_, _ProducerGroup, Producer}) -> _ = rocketmq:stop_and_delete_supervised_producers(Producer) end, emqx_resource:get_allocated_resources_list(InstanceId) @@ -394,18 +394,20 @@ destory_producers_map(ClientId) -> end. get_producers(ChannelId, InstanceId, ClientId, Topic, ProducerOpts) -> - case ets:lookup(ClientId, ChannelId) of + %% The topic need to be included in the name since we can have multiple + %% topics per channel due to templating. + ProducerGroup = iolist_to_binary([ChannelId, "_", Topic]), + case ets:lookup(ClientId, ProducerGroup) of [{_, Producers}] -> Producers; _ -> - ProducerGroup = ChannelId, %% TODO: the name needs to be an atom but this may cause atom leak so we %% should figure out a way to avoid this - ProducerOpts2 = ProducerOpts#{name => binary_to_atom(ChannelId)}, + ProducerOpts2 = ProducerOpts#{name => binary_to_atom(ProducerGroup)}, {ok, Producers} = rocketmq:ensure_supervised_producers( ClientId, ProducerGroup, Topic, ProducerOpts2 ), - ok = emqx_resource:allocate_resource(InstanceId, ChannelId, Producers), - ets:insert(ClientId, {ChannelId, Producers}), + ok = emqx_resource:allocate_resource(InstanceId, ProducerGroup, Producers), + ets:insert(ClientId, {ProducerGroup, Producers}), Producers end.