Merge pull request #12899 from lafirest/fix/rocmq

feat(rocketmq): add support for namespace and key dispatch strategy
This commit is contained in:
lafirest 2024-04-22 21:20:44 +08:00 committed by GitHub
commit a81ca359fc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 80 additions and 16 deletions

View File

@ -163,10 +163,12 @@ fields(action_parameters) ->
{template, {template,
mk( mk(
emqx_schema:template(), emqx_schema:template(),
#{ #{desc => ?DESC("template"), default => ?DEFAULT_TEMPLATE}
desc => ?DESC("template"), )},
default => ?DEFAULT_TEMPLATE {strategy,
} mk(
hoconsc:union([roundrobin, binary()]),
#{desc => ?DESC("strategy"), default => roundrobin}
)} )}
] ++ emqx_bridge_rocketmq_connector:fields(config), ] ++ emqx_bridge_rocketmq_connector:fields(config),
lists:foldl( lists:foldl(
@ -176,6 +178,7 @@ fields(action_parameters) ->
Parameters, Parameters,
[ [
servers, servers,
namespace,
pool_size, pool_size,
auto_reconnect, auto_reconnect,
access_key, access_key,
@ -215,6 +218,11 @@ fields("config") ->
mk( mk(
binary(), binary(),
#{desc => ?DESC("local_topic"), required => false} #{desc => ?DESC("local_topic"), required => false}
)},
{strategy,
mk(
hoconsc:union([roundrobin, binary()]),
#{desc => ?DESC("strategy"), default => roundrobin}
)} )}
] ++ emqx_resource_schema:fields("resource_opts") ++ ] ++ emqx_resource_schema:fields("resource_opts") ++
emqx_bridge_rocketmq_connector:fields(config); emqx_bridge_rocketmq_connector:fields(config);

View File

@ -45,6 +45,11 @@ roots() ->
fields(config) -> fields(config) ->
[ [
{servers, servers()}, {servers, servers()},
{namespace,
mk(
binary(),
#{required => false, desc => ?DESC(namespace)}
)},
{topic, {topic,
mk( mk(
emqx_schema:template(), emqx_schema:template(),
@ -107,7 +112,7 @@ on_start(
), ),
ClientId = client_id(InstanceId), ClientId = client_id(InstanceId),
ACLInfo = acl_info(AccessKey, SecretKey, SecurityToken), ACLInfo = acl_info(AccessKey, SecretKey, SecurityToken),
ClientCfg = #{acl_info => ACLInfo}, ClientCfg = namespace(#{acl_info => ACLInfo}, Config),
State = #{ State = #{
client_id => ClientId, client_id => ClientId,
@ -156,10 +161,12 @@ create_channel_state(
TopicTks = emqx_placeholder:preproc_tmpl(Topic), TopicTks = emqx_placeholder:preproc_tmpl(Topic),
ProducerOpts = make_producer_opts(Conf, ACLInfo), ProducerOpts = make_producer_opts(Conf, ACLInfo),
Templates = parse_template(Conf), Templates = parse_template(Conf),
DispatchStrategy = parse_dispatch_strategy(Conf),
State = #{ State = #{
topic => Topic, topic => Topic,
topic_tokens => TopicTks, topic_tokens => TopicTks,
templates => Templates, templates => Templates,
dispatch_strategy => DispatchStrategy,
sync_timeout => SyncTimeout, sync_timeout => SyncTimeout,
acl_info => ACLInfo, acl_info => ACLInfo,
producers_opts => ProducerOpts producers_opts => ProducerOpts
@ -250,12 +257,13 @@ do_query(
#{ #{
topic_tokens := TopicTks, topic_tokens := TopicTks,
templates := Templates, templates := Templates,
dispatch_strategy := DispatchStrategy,
sync_timeout := RequestTimeout, sync_timeout := RequestTimeout,
producers_opts := ProducerOpts producers_opts := ProducerOpts
} = maps:get(ChannelId, Channels), } = maps:get(ChannelId, Channels),
TopicKey = get_topic_key(Query, TopicTks), TopicKey = get_topic_key(Query, TopicTks),
Data = apply_template(Query, Templates), Data = apply_template(Query, Templates, DispatchStrategy),
Result = safe_do_produce( Result = safe_do_produce(
ChannelId, InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout ChannelId, InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout
@ -317,24 +325,57 @@ parse_template([{Key, H} | T], Templates) ->
parse_template([], Templates) -> parse_template([], Templates) ->
Templates. Templates.
%% returns a procedure to generate the produce context
parse_dispatch_strategy(#{strategy := roundrobin}) ->
fun(_) ->
#{}
end;
parse_dispatch_strategy(#{strategy := Template}) ->
Tokens = emqx_placeholder:preproc_tmpl(Template),
fun(Msg) ->
#{
key =>
case emqx_placeholder:proc_tmpl(Tokens, Msg) of
<<"undefined">> ->
%% Since the key may be absent on some kinds of events (ex:
%% `topic' is absent in `client.disconnected'), and this key is
%% used for routing, we generate a random key when it's absent to
%% better distribute the load, effectively making it `random'
%% dispatch if the key is absent and we are using `key_dispatch'.
%% Otherwise, it'll be deterministic.
emqx_guid:gen();
Key ->
Key
end
}
end.
get_topic_key({_, Msg}, TopicTks) -> get_topic_key({_, Msg}, TopicTks) ->
emqx_placeholder:proc_tmpl(TopicTks, Msg); emqx_placeholder:proc_tmpl(TopicTks, Msg);
get_topic_key([Query | _], TopicTks) -> get_topic_key([Query | _], TopicTks) ->
get_topic_key(Query, TopicTks). get_topic_key(Query, TopicTks).
apply_template({Key, Msg} = _Req, Templates) -> %% return a message data and its context,
%% {binary(), rocketmq_producers:produce_context()})
apply_template({Key, Msg} = _Req, Templates, DispatchStrategy) ->
{
case maps:get(Key, Templates, undefined) of
undefined ->
emqx_utils_json:encode(Msg);
Template ->
emqx_placeholder:proc_tmpl(Template, Msg)
end,
DispatchStrategy(Msg)
};
apply_template([{Key, _} | _] = Reqs, Templates, DispatchStrategy) ->
case maps:get(Key, Templates, undefined) of case maps:get(Key, Templates, undefined) of
undefined -> undefined ->
emqx_utils_json:encode(Msg); [{emqx_utils_json:encode(Msg), DispatchStrategy(Msg)} || {_, Msg} <- Reqs];
Template -> Template ->
emqx_placeholder:proc_tmpl(Template, Msg) [
end; {emqx_placeholder:proc_tmpl(Template, Msg), DispatchStrategy(Msg)}
apply_template([{Key, _} | _] = Reqs, Templates) -> || {_, Msg} <- Reqs
case maps:get(Key, Templates, undefined) of ]
undefined ->
[emqx_utils_json:encode(Msg) || {_, Msg} <- Reqs];
Template ->
[emqx_placeholder:proc_tmpl(Template, Msg) || {_, Msg} <- Reqs]
end. end.
client_id(ResourceId) -> client_id(ResourceId) ->
@ -379,6 +420,10 @@ acl_info(AccessKey, SecretKey, SecurityToken) when is_binary(AccessKey) ->
acl_info(_, _, _) -> acl_info(_, _, _) ->
#{}. #{}.
namespace(ClientCfg, Config) ->
Namespace = maps:get(namespace, Config, <<>>),
ClientCfg#{namespace => Namespace}.
create_producers_map(ClientId) -> create_producers_map(ClientId) ->
_ = ets:new(ClientId, [public, named_table, {read_concurrency, true}]), _ = ets:new(ClientId, [public, named_table, {read_concurrency, true}]),
ok. ok.

View File

@ -0,0 +1 @@
Added support for namespace and key dispatch strategy.

View File

@ -59,4 +59,7 @@ config_connector.desc:
config_connector.label: config_connector.label:
"""RocketMQ Client Configuration""" """RocketMQ Client Configuration"""
strategy.desc:
"""Producer key dispatch strategy, the default is `roundrobin`, also supports placeholders, such as: `clientid`, `messageid`, `username`."""
} }

View File

@ -50,4 +50,10 @@ topic.desc:
topic.label: topic.label:
"""RocketMQ Topic""" """RocketMQ Topic"""
namespace.desc:
"""The namespace field MUST be set if you are using the RocketMQ service in
aliyun cloud and also the namespace is enabled,
or if you have configured a namespace in your RocketMQ server.
For RocketMQ in aliyun cloud, the namespace is the instance ID."""
} }

View File

@ -306,3 +306,4 @@ elasticsearch
ElasticSearch ElasticSearch
doc_as_upsert doc_as_upsert
upsert upsert
aliyun