Merge pull request #13085 from lafirest/fix/rocket_namespace

fix(rocketmq): fix namespace error for RocketMQ
This commit is contained in:
lafirest 2024-05-23 10:39:30 +08:00 committed by GitHub
commit b1076221b0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 23 additions and 14 deletions

View File

@ -112,11 +112,13 @@ on_start(
),
ClientId = client_id(InstanceId),
ACLInfo = acl_info(AccessKey, SecretKey, SecurityToken),
ClientCfg = namespace(#{acl_info => ACLInfo}, Config),
Namespace = maps:get(namespace, Config, <<>>),
ClientCfg = #{acl_info => ACLInfo, namespace => Namespace},
State = #{
client_id => ClientId,
acl_info => ACLInfo,
namespace => Namespace,
installed_channels => #{}
},
@ -139,12 +141,13 @@ on_add_channel(
_InstId,
#{
installed_channels := InstalledChannels,
namespace := Namespace,
acl_info := ACLInfo
} = OldState,
ChannelId,
ChannelConfig
) ->
{ok, ChannelState} = create_channel_state(ChannelConfig, ACLInfo),
{ok, ChannelState} = create_channel_state(ChannelConfig, ACLInfo, Namespace),
NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
%% Update state
NewState = OldState#{installed_channels => NewInstalledChannels},
@ -152,16 +155,18 @@ on_add_channel(
create_channel_state(
#{parameters := Conf} = _ChannelConfig,
ACLInfo
ACLInfo,
Namespace
) ->
#{
topic := Topic,
sync_timeout := SyncTimeout
sync_timeout := SyncTimeout,
strategy := Strategy
} = Conf,
TopicTks = emqx_placeholder:preproc_tmpl(Topic),
ProducerOpts = make_producer_opts(Conf, ACLInfo),
ProducerOpts = make_producer_opts(Conf, ACLInfo, Namespace, Strategy),
Templates = parse_template(Conf),
DispatchStrategy = parse_dispatch_strategy(Conf),
DispatchStrategy = parse_dispatch_strategy(Strategy),
State = #{
topic => Topic,
topic_tokens => TopicTks,
@ -330,11 +335,11 @@ parse_template([], Templates) ->
Templates.
%% returns a procedure to generate the produce context
parse_dispatch_strategy(#{strategy := roundrobin}) ->
parse_dispatch_strategy(roundrobin) ->
fun(_) ->
#{}
end;
parse_dispatch_strategy(#{strategy := Template}) ->
parse_dispatch_strategy(Template) ->
Tokens = emqx_placeholder:preproc_tmpl(Template),
fun(Msg) ->
#{
@ -400,12 +405,20 @@ make_producer_opts(
send_buffer := SendBuff,
refresh_interval := RefreshInterval
},
ACLInfo
ACLInfo,
Namespace,
Strategy
) ->
#{
tcp_opts => [{sndbuf, SendBuff}],
ref_topic_route_interval => RefreshInterval,
acl_info => emqx_secret:wrap(ACLInfo)
acl_info => emqx_secret:wrap(ACLInfo),
namespace => Namespace,
partitioner =>
case Strategy of
roundrobin -> roundrobin;
_ -> key_dispatch
end
}.
acl_info(<<>>, _, _) ->
@ -424,10 +437,6 @@ acl_info(AccessKey, SecretKey, SecurityToken) when is_binary(AccessKey) ->
acl_info(_, _, _) ->
#{}.
namespace(ClientCfg, Config) ->
Namespace = maps:get(namespace, Config, <<>>),
ClientCfg#{namespace => Namespace}.
create_producers_map(ClientId) ->
_ = ets:new(ClientId, [public, named_table, {read_concurrency, true}]),
ok.