docs: improve kafka key and value field description
This commit is contained in:
parent
7073c62dd9
commit
47414e0d53
|
@ -298,7 +298,7 @@ replace_with(Tmpl, RE, '$n') ->
|
||||||
),
|
),
|
||||||
Res.
|
Res.
|
||||||
parse_nested(<<".", R/binary>>) ->
|
parse_nested(<<".", R/binary>>) ->
|
||||||
%% ignroe the root .
|
%% ignore the root .
|
||||||
parse_nested(R);
|
parse_nested(R);
|
||||||
parse_nested(Attr) ->
|
parse_nested(Attr) ->
|
||||||
case string:split(Attr, <<".">>, all) of
|
case string:split(Attr, <<".">>, all) of
|
||||||
|
|
|
@ -250,9 +250,10 @@ emqx_ee_bridge_kafka {
|
||||||
kafka_message_key {
|
kafka_message_key {
|
||||||
desc {
|
desc {
|
||||||
en: "Template to render Kafka message key. "
|
en: "Template to render Kafka message key. "
|
||||||
"If the desired variable for this template is not found in the input data "
|
"If the template is rendered into a NULL value (i.e. there is no such data field in Rule Engine context) "
|
||||||
"<code>NULL</code> is used."
|
"then Kafka's <code>NULL</code> (but not empty string) is used."
|
||||||
zh: "生成 Kafka 消息 Key 的模版。当所需要的输入没有时,会使用 <code>NULL</code>。"
|
zh: "生成 Kafka 消息 Key 的模版。如果模版生成后为空值,"
|
||||||
|
"则会使用 Kafka 的 <code>NULL</code> ,而非空字符串。"
|
||||||
}
|
}
|
||||||
label {
|
label {
|
||||||
en: "Message Key"
|
en: "Message Key"
|
||||||
|
@ -262,9 +263,11 @@ emqx_ee_bridge_kafka {
|
||||||
kafka_message_value {
|
kafka_message_value {
|
||||||
desc {
|
desc {
|
||||||
en: "Template to render Kafka message value. "
|
en: "Template to render Kafka message value. "
|
||||||
"If the desired variable for this template is not found in the input data "
|
"If the template is rendered "
|
||||||
"<code>NULL</code> is used."
|
"into a NULL value (i.e. there is no such data field in Rule Engine context) "
|
||||||
zh: "生成 Kafka 消息 Value 的模版。当所需要的输入没有时,会使用 <code>NULL</code>。"
|
"then Kafka's <code>NULL</code> (but not empty string) is used."
|
||||||
|
zh: "生成 Kafka 消息 Value 的模版。如果模版生成后为空值,"
|
||||||
|
"则会使用 Kafka 的 <code>NULL</code>,而非空字符串。"
|
||||||
}
|
}
|
||||||
label {
|
label {
|
||||||
en: "Message Value"
|
en: "Message Value"
|
||||||
|
|
|
@ -207,11 +207,11 @@ fields(producer_kafka_opts) ->
|
||||||
];
|
];
|
||||||
fields(kafka_message) ->
|
fields(kafka_message) ->
|
||||||
[
|
[
|
||||||
{key, mk(string(), #{default => "${clientid}", desc => ?DESC(kafka_message_key)})},
|
{key, mk(string(), #{default => "${.clientid}", desc => ?DESC(kafka_message_key)})},
|
||||||
{value, mk(string(), #{default => "${payload}", desc => ?DESC(kafka_message_value)})},
|
{value, mk(string(), #{default => "${.}", desc => ?DESC(kafka_message_value)})},
|
||||||
{timestamp,
|
{timestamp,
|
||||||
mk(string(), #{
|
mk(string(), #{
|
||||||
default => "${timestamp}", desc => ?DESC(kafka_message_timestamp)
|
default => "${.timestamp}", desc => ?DESC(kafka_message_timestamp)
|
||||||
})}
|
})}
|
||||||
];
|
];
|
||||||
fields(producer_buffer) ->
|
fields(producer_buffer) ->
|
||||||
|
|
|
@ -145,15 +145,19 @@ on_query(_InstId, {send_message, Message}, #{message_template := Template, produ
|
||||||
{_Partition, _Pid} = wolff:send(Producers, [KafkaMessage], {fun ?MODULE:on_kafka_ack/3, [#{}]}),
|
{_Partition, _Pid} = wolff:send(Producers, [KafkaMessage], {fun ?MODULE:on_kafka_ack/3, [#{}]}),
|
||||||
{async_return, ok}.
|
{async_return, ok}.
|
||||||
|
|
||||||
compile_message_template(#{
|
compile_message_template(T) ->
|
||||||
key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate
|
KeyTemplate = maps:get(key, T, <<"${.clientid}">>),
|
||||||
}) ->
|
ValueTemplate = maps:get(value, T, <<"${.}">>),
|
||||||
|
TimestampTemplate = maps:get(value, T, <<"${.timestamp}">>),
|
||||||
#{
|
#{
|
||||||
key => emqx_plugin_libs_rule:preproc_tmpl(KeyTemplate),
|
key => preproc_tmpl(KeyTemplate),
|
||||||
value => emqx_plugin_libs_rule:preproc_tmpl(ValueTemplate),
|
value => preproc_tmpl(ValueTemplate),
|
||||||
timestamp => emqx_plugin_libs_rule:preproc_tmpl(TimestampTemplate)
|
timestamp => preproc_tmpl(TimestampTemplate)
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
preproc_tmpl(Tmpl) ->
|
||||||
|
emqx_plugin_libs_rule:preproc_tmpl(Tmpl).
|
||||||
|
|
||||||
render_message(
|
render_message(
|
||||||
#{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate}, Message
|
#{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate}, Message
|
||||||
) ->
|
) ->
|
||||||
|
@ -164,7 +168,14 @@ render_message(
|
||||||
}.
|
}.
|
||||||
|
|
||||||
render(Template, Message) ->
|
render(Template, Message) ->
|
||||||
emqx_plugin_libs_rule:proc_tmpl(Template, Message).
|
Opts = #{
|
||||||
|
var_trans => fun
|
||||||
|
(undefined) -> <<"">>;
|
||||||
|
(X) -> emqx_plugin_libs_rule:bin(X)
|
||||||
|
end,
|
||||||
|
return => full_binary
|
||||||
|
},
|
||||||
|
emqx_plugin_libs_rule:proc_tmpl(Template, Message, Opts).
|
||||||
|
|
||||||
render_timestamp(Template, Message) ->
|
render_timestamp(Template, Message) ->
|
||||||
try
|
try
|
||||||
|
|
|
@ -260,7 +260,11 @@ kafka_bridge_rest_api_helper(Config) ->
|
||||||
topic => <<"t/#">>
|
topic => <<"t/#">>
|
||||||
},
|
},
|
||||||
<<"kafka">> => #{
|
<<"kafka">> => #{
|
||||||
<<"topic">> => erlang:list_to_binary(KafkaTopic)
|
<<"topic">> => erlang:list_to_binary(KafkaTopic),
|
||||||
|
<<"message">> => #{
|
||||||
|
<<"key">> => <<"${clientid}">>,
|
||||||
|
<<"value">> => <<"${.payload}">>
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
@ -501,6 +505,7 @@ producer = {
|
||||||
}
|
}
|
||||||
kafka = {
|
kafka = {
|
||||||
topic = \"{{ kafka_topic }}\"
|
topic = \"{{ kafka_topic }}\"
|
||||||
|
message = {key = \"${clientid}\", value = \"${.payload}\"}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
""".
|
""".
|
||||||
|
|
Loading…
Reference in New Issue