From d5cdc07eabd31eacc81083728b158a437077653a Mon Sep 17 00:00:00 2001 From: firest Date: Mon, 22 Apr 2024 13:14:11 +0800 Subject: [PATCH 1/2] feat(rocketmq): add support for namespace and key dispatch strategy --- .../src/emqx_bridge_rocketmq.erl | 16 +++-- .../src/emqx_bridge_rocketmq_connector.erl | 69 +++++++++++++++---- rel/i18n/emqx_bridge_rocketmq.hocon | 3 + rel/i18n/emqx_bridge_rocketmq_connector.hocon | 6 ++ 4 files changed, 78 insertions(+), 16 deletions(-) diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.erl index 750993e9a..f7e6d9b57 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.erl @@ -163,10 +163,12 @@ fields(action_parameters) -> {template, mk( emqx_schema:template(), - #{ - desc => ?DESC("template"), - default => ?DEFAULT_TEMPLATE - } + #{desc => ?DESC("template"), default => ?DEFAULT_TEMPLATE} + )}, + {strategy, + mk( + hoconsc:union([roundrobin, binary()]), + #{desc => ?DESC("strategy"), default => roundrobin} )} ] ++ emqx_bridge_rocketmq_connector:fields(config), lists:foldl( @@ -176,6 +178,7 @@ fields(action_parameters) -> Parameters, [ servers, + namespace, pool_size, auto_reconnect, access_key, @@ -215,6 +218,11 @@ fields("config") -> mk( binary(), #{desc => ?DESC("local_topic"), required => false} + )}, + {strategy, + mk( + hoconsc:union([roundrobin, binary()]), + #{desc => ?DESC("strategy"), default => roundrobin} )} ] ++ emqx_resource_schema:fields("resource_opts") ++ emqx_bridge_rocketmq_connector:fields(config); diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl index 0bea5a8ff..0141c3fd0 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl @@ -45,6 +45,11 @@ roots() -> fields(config) -> [ {servers, servers()}, + {namespace, + mk( + binary(), + #{required => false, desc => ?DESC(namespace)} + )}, {topic, mk( emqx_schema:template(), @@ -107,7 +112,7 @@ on_start( ), ClientId = client_id(InstanceId), ACLInfo = acl_info(AccessKey, SecretKey, SecurityToken), - ClientCfg = #{acl_info => ACLInfo}, + ClientCfg = namespace(#{acl_info => ACLInfo}, Config), State = #{ client_id => ClientId, @@ -156,10 +161,12 @@ create_channel_state( TopicTks = emqx_placeholder:preproc_tmpl(Topic), ProducerOpts = make_producer_opts(Conf, ACLInfo), Templates = parse_template(Conf), + DispatchStrategy = parse_dispatch_strategy(Conf), State = #{ topic => Topic, topic_tokens => TopicTks, templates => Templates, + dispatch_strategy => DispatchStrategy, sync_timeout => SyncTimeout, acl_info => ACLInfo, producers_opts => ProducerOpts @@ -250,12 +257,13 @@ do_query( #{ topic_tokens := TopicTks, templates := Templates, + dispatch_strategy := DispatchStrategy, sync_timeout := RequestTimeout, producers_opts := ProducerOpts } = maps:get(ChannelId, Channels), TopicKey = get_topic_key(Query, TopicTks), - Data = apply_template(Query, Templates), + Data = apply_template(Query, Templates, DispatchStrategy), Result = safe_do_produce( InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout @@ -315,24 +323,57 @@ parse_template([{Key, H} | T], Templates) -> parse_template([], Templates) -> Templates. +%% returns a procedure to generate the produce context +parse_dispatch_strategy(#{strategy := roundrobin}) -> + fun(_) -> + #{} + end; +parse_dispatch_strategy(#{strategy := Template}) -> + Tokens = emqx_placeholder:preproc_tmpl(Template), + fun(Msg) -> + #{ + key => + case emqx_placeholder:proc_tmpl(Tokens, Msg) of + <<"undefined">> -> + %% Since the key may be absent on some kinds of events (ex: + %% `topic' is absent in `client.disconnected'), and this key is + %% used for routing, we generate a random key when it's absent to + %% better distribute the load, effectively making it `random' + %% dispatch if the key is absent and we are using `key_dispatch'. + %% Otherwise, it'll be deterministic. + emqx_guid:gen(); + Key -> + Key + end + } + end. + get_topic_key({_, Msg}, TopicTks) -> emqx_placeholder:proc_tmpl(TopicTks, Msg); get_topic_key([Query | _], TopicTks) -> get_topic_key(Query, TopicTks). -apply_template({Key, Msg} = _Req, Templates) -> +%% return a message data and its context, +%% {binary(), rocketmq_producers:produce_context()}) +apply_template({Key, Msg} = _Req, Templates, DispatchStrategy) -> + { + case maps:get(Key, Templates, undefined) of + undefined -> + emqx_utils_json:encode(Msg); + Template -> + emqx_placeholder:proc_tmpl(Template, Msg) + end, + DispatchStrategy(Msg) + }; +apply_template([{Key, _} | _] = Reqs, Templates, DispatchStrategy) -> case maps:get(Key, Templates, undefined) of undefined -> - emqx_utils_json:encode(Msg); + [{emqx_utils_json:encode(Msg), DispatchStrategy(Msg)} || {_, Msg} <- Reqs]; Template -> - emqx_placeholder:proc_tmpl(Template, Msg) - end; -apply_template([{Key, _} | _] = Reqs, Templates) -> - case maps:get(Key, Templates, undefined) of - undefined -> - [emqx_utils_json:encode(Msg) || {_, Msg} <- Reqs]; - Template -> - [emqx_placeholder:proc_tmpl(Template, Msg) || {_, Msg} <- Reqs] + [ + {emqx_placeholder:proc_tmpl(Template, Msg), DispatchStrategy(Msg)} + || {_, Msg} <- Reqs + ] end. client_id(ResourceId) -> @@ -377,6 +418,10 @@ acl_info(AccessKey, SecretKey, SecurityToken) when is_binary(AccessKey) -> acl_info(_, _, _) -> #{}. +namespace(ClientCfg, Config) -> + Namespace = maps:get(namespace, Config, <<>>), + ClientCfg#{namespace => Namespace}. + create_producers_map(ClientId) -> _ = ets:new(ClientId, [public, named_table, {read_concurrency, true}]), ok. diff --git a/rel/i18n/emqx_bridge_rocketmq.hocon b/rel/i18n/emqx_bridge_rocketmq.hocon index b6bb3aad6..fe6ca8c8d 100644 --- a/rel/i18n/emqx_bridge_rocketmq.hocon +++ b/rel/i18n/emqx_bridge_rocketmq.hocon @@ -59,4 +59,7 @@ config_connector.desc: config_connector.label: """RocketMQ Client Configuration""" +strategy.desc: +"""Producer key dispatch strategy, the default is `roundrobin`, also supports placeholders, such as: `clientid`, `messageid`, `username`.""" + } diff --git a/rel/i18n/emqx_bridge_rocketmq_connector.hocon b/rel/i18n/emqx_bridge_rocketmq_connector.hocon index b13e015c2..b65ce5405 100644 --- a/rel/i18n/emqx_bridge_rocketmq_connector.hocon +++ b/rel/i18n/emqx_bridge_rocketmq_connector.hocon @@ -50,4 +50,10 @@ topic.desc: topic.label: """RocketMQ Topic""" +namespace.desc: +"""The namespace field MUST be set if you are using the RocketMQ service in +aliyun cloud and also the namespace is enabled, +or if you have configured a namespace in your RocketMQ server. +For RocketMQ in aliyun cloud, the namespace is the instance ID.""" + } From 617b2137b4f0bddde8eb60bd4e1d3f046d924fbf Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 19 Apr 2024 15:16:56 +0800 Subject: [PATCH 2/2] chore: update changes --- changes/ee/feat-12899.en.md | 1 + scripts/spellcheck/dicts/emqx.txt | 1 + 2 files changed, 2 insertions(+) create mode 100644 changes/ee/feat-12899.en.md diff --git a/changes/ee/feat-12899.en.md b/changes/ee/feat-12899.en.md new file mode 100644 index 000000000..8d5b62bcc --- /dev/null +++ b/changes/ee/feat-12899.en.md @@ -0,0 +1 @@ +Added support for namespace and key dispatch strategy. diff --git a/scripts/spellcheck/dicts/emqx.txt b/scripts/spellcheck/dicts/emqx.txt index c7c266434..d68c85716 100644 --- a/scripts/spellcheck/dicts/emqx.txt +++ b/scripts/spellcheck/dicts/emqx.txt @@ -306,3 +306,4 @@ elasticsearch ElasticSearch doc_as_upsert upsert +aliyun