From f3e8037e0f830d2c42cbf0ca74ec97e8e86e5045 Mon Sep 17 00:00:00 2001 From: firest Date: Wed, 22 May 2024 16:29:38 +0800 Subject: [PATCH] fix(rocketmq): fix namespace error for RocketMQ --- .../src/emqx_bridge_rocketmq_connector.erl | 37 ++++++++++++------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl index 4aeb6e772..5c62fd622 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl @@ -112,11 +112,13 @@ on_start( ), ClientId = client_id(InstanceId), ACLInfo = acl_info(AccessKey, SecretKey, SecurityToken), - ClientCfg = namespace(#{acl_info => ACLInfo}, Config), + Namespace = maps:get(namespace, Config, <<>>), + ClientCfg = #{acl_info => ACLInfo, namespace => Namespace}, State = #{ client_id => ClientId, acl_info => ACLInfo, + namespace => Namespace, installed_channels => #{} }, @@ -139,12 +141,13 @@ on_add_channel( _InstId, #{ installed_channels := InstalledChannels, + namespace := Namespace, acl_info := ACLInfo } = OldState, ChannelId, ChannelConfig ) -> - {ok, ChannelState} = create_channel_state(ChannelConfig, ACLInfo), + {ok, ChannelState} = create_channel_state(ChannelConfig, ACLInfo, Namespace), NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels), %% Update state NewState = OldState#{installed_channels => NewInstalledChannels}, @@ -152,16 +155,18 @@ on_add_channel( create_channel_state( #{parameters := Conf} = _ChannelConfig, - ACLInfo + ACLInfo, + Namespace ) -> #{ topic := Topic, - sync_timeout := SyncTimeout + sync_timeout := SyncTimeout, + strategy := Strategy } = Conf, TopicTks = emqx_placeholder:preproc_tmpl(Topic), - ProducerOpts = make_producer_opts(Conf, ACLInfo), + ProducerOpts = make_producer_opts(Conf, ACLInfo, Namespace, Strategy), Templates = parse_template(Conf), - DispatchStrategy = parse_dispatch_strategy(Conf), + DispatchStrategy = parse_dispatch_strategy(Strategy), State = #{ topic => Topic, topic_tokens => TopicTks, @@ -330,11 +335,11 @@ parse_template([], Templates) -> Templates. %% returns a procedure to generate the produce context -parse_dispatch_strategy(#{strategy := roundrobin}) -> +parse_dispatch_strategy(roundrobin) -> fun(_) -> #{} end; -parse_dispatch_strategy(#{strategy := Template}) -> +parse_dispatch_strategy(Template) -> Tokens = emqx_placeholder:preproc_tmpl(Template), fun(Msg) -> #{ @@ -400,12 +405,20 @@ make_producer_opts( send_buffer := SendBuff, refresh_interval := RefreshInterval }, - ACLInfo + ACLInfo, + Namespace, + Strategy ) -> #{ tcp_opts => [{sndbuf, SendBuff}], ref_topic_route_interval => RefreshInterval, - acl_info => emqx_secret:wrap(ACLInfo) + acl_info => emqx_secret:wrap(ACLInfo), + namespace => Namespace, + partitioner => + case Strategy of + roundrobin -> roundrobin; + _ -> key_dispatch + end }. acl_info(<<>>, _, _) -> @@ -424,10 +437,6 @@ acl_info(AccessKey, SecretKey, SecurityToken) when is_binary(AccessKey) -> acl_info(_, _, _) -> #{}. -namespace(ClientCfg, Config) -> - Namespace = maps:get(namespace, Config, <<>>), - ClientCfg#{namespace => Namespace}. - create_producers_map(ClientId) -> _ = ets:new(ClientId, [public, named_table, {read_concurrency, true}]), ok.