diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl index bd5154df5..6a7b8d4bc 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl @@ -139,14 +139,13 @@ on_add_channel( ChannelId, ChannelConfig ) -> - {ok, ChannelState} = create_channel_state(ChannelId, ChannelConfig, ACLInfo), + {ok, ChannelState} = create_channel_state(ChannelConfig, ACLInfo), NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels), %% Update state NewState = OldState#{installed_channels => NewInstalledChannels}, {ok, NewState}. create_channel_state( - ChannelId, #{parameters := Conf} = _ChannelConfig, ACLInfo ) -> @@ -155,7 +154,7 @@ create_channel_state( sync_timeout := SyncTimeout } = Conf, TopicTks = emqx_placeholder:preproc_tmpl(Topic), - ProducerOpts = make_producer_opts(ChannelId, Conf, ACLInfo), + ProducerOpts = make_producer_opts(Conf, ACLInfo), Templates = parse_template(Conf), State = #{ topic => Topic, @@ -203,7 +202,7 @@ on_stop(InstanceId, _State) -> ({_, client_id, ClientId}) -> destory_producers_map(ClientId), ok = rocketmq:stop_and_delete_supervised_client(ClientId); - ({_, _Topic, Producer}) -> + ({_, _ChannelId, Producer}) -> _ = rocketmq:stop_and_delete_supervised_producers(Producer) end, emqx_resource:get_allocated_resources_list(InstanceId) @@ -259,7 +258,7 @@ do_query( Data = apply_template(Query, Templates), Result = safe_do_produce( - InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout + ChannelId, InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout ), case Result of {error, Reason} -> @@ -285,9 +284,11 @@ do_query( get_channel_id({ChannelId, _}) -> ChannelId; get_channel_id([{ChannelId, _} | _]) -> ChannelId. -safe_do_produce(InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout) -> +safe_do_produce( + ChannelId, InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout +) -> try - Producers = get_producers(InstanceId, ClientId, TopicKey, ProducerOpts), + Producers = get_producers(ChannelId, InstanceId, ClientId, TopicKey, ProducerOpts), produce(InstanceId, QueryFunc, Producers, Data, RequestTimeout) catch _Type:Reason -> @@ -350,7 +351,6 @@ is_sensitive_key(_) -> false. make_producer_opts( - ChannelId, #{ send_buffer := SendBuff, refresh_interval := RefreshInterval @@ -358,9 +358,6 @@ make_producer_opts( ACLInfo ) -> #{ - %% TODO: the name needs to be an atom but this may cause atom leak so we - %% should figure out a way to avoid this - name => binary_to_atom(ChannelId), tcp_opts => [{sndbuf, SendBuff}], ref_topic_route_interval => RefreshInterval, acl_info => emqx_secret:wrap(ACLInfo) @@ -396,16 +393,19 @@ destory_producers_map(ClientId) -> ets:delete(Tid) end. -get_producers(InstanceId, ClientId, Topic, ProducerOpts) -> - case ets:lookup(ClientId, Topic) of +get_producers(ChannelId, InstanceId, ClientId, Topic, ProducerOpts) -> + case ets:lookup(ClientId, ChannelId) of [{_, Producers}] -> Producers; _ -> - ProducerGroup = iolist_to_binary([atom_to_list(ClientId), "_", Topic]), + ProducerGroup = ChannelId, + %% TODO: the name needs to be an atom but this may cause atom leak so we + %% should figure out a way to avoid this + ProducerOpts2 = ProducerOpts#{name => binary_to_atom(ChannelId)}, {ok, Producers} = rocketmq:ensure_supervised_producers( - ClientId, ProducerGroup, Topic, ProducerOpts + ClientId, ProducerGroup, Topic, ProducerOpts2 ), - ok = emqx_resource:allocate_resource(InstanceId, Topic, Producers), - ets:insert(ClientId, {Topic, Producers}), + ok = emqx_resource:allocate_resource(InstanceId, ChannelId, Producers), + ets:insert(ClientId, {ChannelId, Producers}), Producers end.