feat(rocketmq): add support for namespace and key dispatch strategy
This commit is contained in:
parent
6e99f01ecd
commit
d5cdc07eab
|
@ -163,10 +163,12 @@ fields(action_parameters) ->
|
|||
{template,
|
||||
mk(
|
||||
emqx_schema:template(),
|
||||
#{
|
||||
desc => ?DESC("template"),
|
||||
default => ?DEFAULT_TEMPLATE
|
||||
}
|
||||
#{desc => ?DESC("template"), default => ?DEFAULT_TEMPLATE}
|
||||
)},
|
||||
{strategy,
|
||||
mk(
|
||||
hoconsc:union([roundrobin, binary()]),
|
||||
#{desc => ?DESC("strategy"), default => roundrobin}
|
||||
)}
|
||||
] ++ emqx_bridge_rocketmq_connector:fields(config),
|
||||
lists:foldl(
|
||||
|
@ -176,6 +178,7 @@ fields(action_parameters) ->
|
|||
Parameters,
|
||||
[
|
||||
servers,
|
||||
namespace,
|
||||
pool_size,
|
||||
auto_reconnect,
|
||||
access_key,
|
||||
|
@ -215,6 +218,11 @@ fields("config") ->
|
|||
mk(
|
||||
binary(),
|
||||
#{desc => ?DESC("local_topic"), required => false}
|
||||
)},
|
||||
{strategy,
|
||||
mk(
|
||||
hoconsc:union([roundrobin, binary()]),
|
||||
#{desc => ?DESC("strategy"), default => roundrobin}
|
||||
)}
|
||||
] ++ emqx_resource_schema:fields("resource_opts") ++
|
||||
emqx_bridge_rocketmq_connector:fields(config);
|
||||
|
|
|
@ -45,6 +45,11 @@ roots() ->
|
|||
fields(config) ->
|
||||
[
|
||||
{servers, servers()},
|
||||
{namespace,
|
||||
mk(
|
||||
binary(),
|
||||
#{required => false, desc => ?DESC(namespace)}
|
||||
)},
|
||||
{topic,
|
||||
mk(
|
||||
emqx_schema:template(),
|
||||
|
@ -107,7 +112,7 @@ on_start(
|
|||
),
|
||||
ClientId = client_id(InstanceId),
|
||||
ACLInfo = acl_info(AccessKey, SecretKey, SecurityToken),
|
||||
ClientCfg = #{acl_info => ACLInfo},
|
||||
ClientCfg = namespace(#{acl_info => ACLInfo}, Config),
|
||||
|
||||
State = #{
|
||||
client_id => ClientId,
|
||||
|
@ -156,10 +161,12 @@ create_channel_state(
|
|||
TopicTks = emqx_placeholder:preproc_tmpl(Topic),
|
||||
ProducerOpts = make_producer_opts(Conf, ACLInfo),
|
||||
Templates = parse_template(Conf),
|
||||
DispatchStrategy = parse_dispatch_strategy(Conf),
|
||||
State = #{
|
||||
topic => Topic,
|
||||
topic_tokens => TopicTks,
|
||||
templates => Templates,
|
||||
dispatch_strategy => DispatchStrategy,
|
||||
sync_timeout => SyncTimeout,
|
||||
acl_info => ACLInfo,
|
||||
producers_opts => ProducerOpts
|
||||
|
@ -250,12 +257,13 @@ do_query(
|
|||
#{
|
||||
topic_tokens := TopicTks,
|
||||
templates := Templates,
|
||||
dispatch_strategy := DispatchStrategy,
|
||||
sync_timeout := RequestTimeout,
|
||||
producers_opts := ProducerOpts
|
||||
} = maps:get(ChannelId, Channels),
|
||||
|
||||
TopicKey = get_topic_key(Query, TopicTks),
|
||||
Data = apply_template(Query, Templates),
|
||||
Data = apply_template(Query, Templates, DispatchStrategy),
|
||||
|
||||
Result = safe_do_produce(
|
||||
InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout
|
||||
|
@ -315,24 +323,57 @@ parse_template([{Key, H} | T], Templates) ->
|
|||
parse_template([], Templates) ->
|
||||
Templates.
|
||||
|
||||
%% returns a procedure to generate the produce context
|
||||
parse_dispatch_strategy(#{strategy := roundrobin}) ->
|
||||
fun(_) ->
|
||||
#{}
|
||||
end;
|
||||
parse_dispatch_strategy(#{strategy := Template}) ->
|
||||
Tokens = emqx_placeholder:preproc_tmpl(Template),
|
||||
fun(Msg) ->
|
||||
#{
|
||||
key =>
|
||||
case emqx_placeholder:proc_tmpl(Tokens, Msg) of
|
||||
<<"undefined">> ->
|
||||
%% Since the key may be absent on some kinds of events (ex:
|
||||
%% `topic' is absent in `client.disconnected'), and this key is
|
||||
%% used for routing, we generate a random key when it's absent to
|
||||
%% better distribute the load, effectively making it `random'
|
||||
%% dispatch if the key is absent and we are using `key_dispatch'.
|
||||
%% Otherwise, it'll be deterministic.
|
||||
emqx_guid:gen();
|
||||
Key ->
|
||||
Key
|
||||
end
|
||||
}
|
||||
end.
|
||||
|
||||
get_topic_key({_, Msg}, TopicTks) ->
|
||||
emqx_placeholder:proc_tmpl(TopicTks, Msg);
|
||||
get_topic_key([Query | _], TopicTks) ->
|
||||
get_topic_key(Query, TopicTks).
|
||||
|
||||
apply_template({Key, Msg} = _Req, Templates) ->
|
||||
%% return a message data and its context,
|
||||
%% {binary(), rocketmq_producers:produce_context()})
|
||||
apply_template({Key, Msg} = _Req, Templates, DispatchStrategy) ->
|
||||
{
|
||||
case maps:get(Key, Templates, undefined) of
|
||||
undefined ->
|
||||
emqx_utils_json:encode(Msg);
|
||||
Template ->
|
||||
emqx_placeholder:proc_tmpl(Template, Msg)
|
||||
end,
|
||||
DispatchStrategy(Msg)
|
||||
};
|
||||
apply_template([{Key, _} | _] = Reqs, Templates, DispatchStrategy) ->
|
||||
case maps:get(Key, Templates, undefined) of
|
||||
undefined ->
|
||||
emqx_utils_json:encode(Msg);
|
||||
[{emqx_utils_json:encode(Msg), DispatchStrategy(Msg)} || {_, Msg} <- Reqs];
|
||||
Template ->
|
||||
emqx_placeholder:proc_tmpl(Template, Msg)
|
||||
end;
|
||||
apply_template([{Key, _} | _] = Reqs, Templates) ->
|
||||
case maps:get(Key, Templates, undefined) of
|
||||
undefined ->
|
||||
[emqx_utils_json:encode(Msg) || {_, Msg} <- Reqs];
|
||||
Template ->
|
||||
[emqx_placeholder:proc_tmpl(Template, Msg) || {_, Msg} <- Reqs]
|
||||
[
|
||||
{emqx_placeholder:proc_tmpl(Template, Msg), DispatchStrategy(Msg)}
|
||||
|| {_, Msg} <- Reqs
|
||||
]
|
||||
end.
|
||||
|
||||
client_id(ResourceId) ->
|
||||
|
@ -377,6 +418,10 @@ acl_info(AccessKey, SecretKey, SecurityToken) when is_binary(AccessKey) ->
|
|||
acl_info(_, _, _) ->
|
||||
#{}.
|
||||
|
||||
namespace(ClientCfg, Config) ->
|
||||
Namespace = maps:get(namespace, Config, <<>>),
|
||||
ClientCfg#{namespace => Namespace}.
|
||||
|
||||
create_producers_map(ClientId) ->
|
||||
_ = ets:new(ClientId, [public, named_table, {read_concurrency, true}]),
|
||||
ok.
|
||||
|
|
|
@ -59,4 +59,7 @@ config_connector.desc:
|
|||
config_connector.label:
|
||||
"""RocketMQ Client Configuration"""
|
||||
|
||||
strategy.desc:
|
||||
"""Producer key dispatch strategy, the default is `roundrobin`, also supports placeholders, such as: `clientid`, `messageid`, `username`."""
|
||||
|
||||
}
|
||||
|
|
|
@ -50,4 +50,10 @@ topic.desc:
|
|||
topic.label:
|
||||
"""RocketMQ Topic"""
|
||||
|
||||
namespace.desc:
|
||||
"""The namespace field MUST be set if you are using the RocketMQ service in
|
||||
aliyun cloud and also the namespace is enabled,
|
||||
or if you have configured a namespace in your RocketMQ server.
|
||||
For RocketMQ in aliyun cloud, the namespace is the instance ID."""
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue