diff --git a/Makefile b/Makefile
index faa866753..fc87f1d95 100644
--- a/Makefile
+++ b/Makefile
@@ -7,7 +7,7 @@ export EMQX_DEFAULT_RUNNER = debian:11-slim
export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh)
export ELIXIR_VSN ?= $(shell $(CURDIR)/scripts/get-elixir-vsn.sh)
export EMQX_DASHBOARD_VERSION ?= v1.1.5
-export EMQX_EE_DASHBOARD_VERSION ?= e1.0.1-beta.12
+export EMQX_EE_DASHBOARD_VERSION ?= e1.0.1-beta.13
export EMQX_REL_FORM ?= tgz
export QUICER_DOWNLOAD_FROM_RELEASE = 1
ifeq ($(OS),Windows_NT)
diff --git a/apps/emqx/include/emqx_placeholder.hrl b/apps/emqx/include/emqx_placeholder.hrl
index c4d235caf..d5da3fb18 100644
--- a/apps/emqx/include/emqx_placeholder.hrl
+++ b/apps/emqx/include/emqx_placeholder.hrl
@@ -17,6 +17,8 @@
-ifndef(EMQX_PLACEHOLDER_HRL).
-define(EMQX_PLACEHOLDER_HRL, true).
+-define(PH_VAR_THIS, <<"$_THIS_">>).
+
-define(PH(Type), <<"${", Type/binary, "}">>).
%% action: publish/subscribe/all
diff --git a/apps/emqx_plugin_libs/src/emqx_placeholder.erl b/apps/emqx_plugin_libs/src/emqx_placeholder.erl
index 3b15a389d..3e98fa149 100644
--- a/apps/emqx_plugin_libs/src/emqx_placeholder.erl
+++ b/apps/emqx_plugin_libs/src/emqx_placeholder.erl
@@ -39,6 +39,8 @@
sql_data/1
]).
+-include_lib("emqx/include/emqx_placeholder.hrl").
+
-define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})").
-define(EX_PLACE_HOLDER_DOUBLE_QUOTE, "(\\$\\{[a-zA-Z0-9\\._]+\\}|\"\\$\\{[a-zA-Z0-9\\._]+\\}\")").
@@ -233,9 +235,6 @@ proc_param_str(Tokens, Data, Quote) ->
proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => Quote})
).
-%% backward compatibility for hot upgrading from =< e4.2.1
-get_phld_var(Fun, Data) when is_function(Fun) ->
- Fun(Data);
get_phld_var(Phld, Data) ->
emqx_rule_maps:nested_get(Phld, Data).
@@ -298,9 +297,12 @@ replace_with(Tmpl, RE, '$n') ->
Parts
),
Res.
-
+parse_nested(<<".", R/binary>>) ->
+ %% ignore the root .
+ parse_nested(R);
parse_nested(Attr) ->
case string:split(Attr, <<".">>, all) of
+ [<<>>] -> {var, ?PH_VAR_THIS};
[Attr] -> {var, Attr};
Nested -> {path, [{key, P} || P <- Nested]}
end.
diff --git a/apps/emqx_rule_engine/src/emqx_rule_maps.erl b/apps/emqx_rule_engine/src/emqx_rule_maps.erl
index 3e0ebc72d..13f99c88b 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_maps.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_maps.erl
@@ -26,9 +26,13 @@
unsafe_atom_key_map/1
]).
+-include_lib("emqx/include/emqx_placeholder.hrl").
+
nested_get(Key, Data) ->
nested_get(Key, Data, undefined).
+nested_get({var, ?PH_VAR_THIS}, Data, _Default) ->
+ Data;
nested_get({var, Key}, Data, Default) ->
general_map_get({key, Key}, Data, Data, Default);
nested_get({path, Path}, Data, Default) when is_list(Path) ->
diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf
index 02f979449..be1e581bd 100644
--- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf
+++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf
@@ -250,9 +250,10 @@ emqx_ee_bridge_kafka {
kafka_message_key {
desc {
en: "Template to render Kafka message key. "
- "If the desired variable for this template is not found in the input data "
- "NULL
is used."
- zh: "生成 Kafka 消息 Key 的模版。当所需要的输入没有时,会使用 NULL
。"
+ "If the template is rendered into a NULL value (i.e. there is no such data field in Rule Engine context) "
+ "then Kafka's NULL
(but not empty string) is used."
+ zh: "生成 Kafka 消息 Key 的模版。如果模版生成后为空值,"
+ "则会使用 Kafka 的 NULL
,而非空字符串。"
}
label {
en: "Message Key"
@@ -262,9 +263,11 @@ emqx_ee_bridge_kafka {
kafka_message_value {
desc {
en: "Template to render Kafka message value. "
- "If the desired variable for this template is not found in the input data "
- "NULL
is used."
- zh: "生成 Kafka 消息 Value 的模版。当所需要的输入没有时,会使用 NULL
。"
+ "If the template is rendered "
+ "into a NULL value (i.e. there is no such data field in Rule Engine context) "
+ "then Kafka's NULL
(but not empty string) is used."
+ zh: "生成 Kafka 消息 Value 的模版。如果模版生成后为空值,"
+ "则会使用 Kafka 的 NULL
,而非空字符串。"
}
label {
en: "Message Value"
diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl
index 9fae4f30a..4a6c1411c 100644
--- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl
+++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl
@@ -207,11 +207,11 @@ fields(producer_kafka_opts) ->
];
fields(kafka_message) ->
[
- {key, mk(string(), #{default => "${clientid}", desc => ?DESC(kafka_message_key)})},
- {value, mk(string(), #{default => "${payload}", desc => ?DESC(kafka_message_value)})},
+ {key, mk(string(), #{default => "${.clientid}", desc => ?DESC(kafka_message_key)})},
+ {value, mk(string(), #{default => "${.}", desc => ?DESC(kafka_message_value)})},
{timestamp,
mk(string(), #{
- default => "${timestamp}", desc => ?DESC(kafka_message_timestamp)
+ default => "${.timestamp}", desc => ?DESC(kafka_message_timestamp)
})}
];
fields(producer_buffer) ->
diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl
index a7de100d6..d7b4de0c0 100644
--- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl
+++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl
@@ -145,15 +145,19 @@ on_query(_InstId, {send_message, Message}, #{message_template := Template, produ
{_Partition, _Pid} = wolff:send(Producers, [KafkaMessage], {fun ?MODULE:on_kafka_ack/3, [#{}]}),
{async_return, ok}.
-compile_message_template(#{
- key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate
-}) ->
+compile_message_template(T) ->
+ KeyTemplate = maps:get(key, T, <<"${.clientid}">>),
+ ValueTemplate = maps:get(value, T, <<"${.}">>),
+ TimestampTemplate = maps:get(value, T, <<"${.timestamp}">>),
#{
- key => emqx_plugin_libs_rule:preproc_tmpl(KeyTemplate),
- value => emqx_plugin_libs_rule:preproc_tmpl(ValueTemplate),
- timestamp => emqx_plugin_libs_rule:preproc_tmpl(TimestampTemplate)
+ key => preproc_tmpl(KeyTemplate),
+ value => preproc_tmpl(ValueTemplate),
+ timestamp => preproc_tmpl(TimestampTemplate)
}.
+preproc_tmpl(Tmpl) ->
+ emqx_plugin_libs_rule:preproc_tmpl(Tmpl).
+
render_message(
#{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate}, Message
) ->
@@ -164,7 +168,14 @@ render_message(
}.
render(Template, Message) ->
- emqx_plugin_libs_rule:proc_tmpl(Template, Message).
+ Opts = #{
+ var_trans => fun
+ (undefined) -> <<"">>;
+ (X) -> emqx_plugin_libs_rule:bin(X)
+ end,
+ return => full_binary
+ },
+ emqx_plugin_libs_rule:proc_tmpl(Template, Message, Opts).
render_timestamp(Template, Message) ->
try
diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl
index 8e5b1fa95..2d67a9941 100644
--- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl
+++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl
@@ -260,7 +260,11 @@ kafka_bridge_rest_api_helper(Config) ->
topic => <<"t/#">>
},
<<"kafka">> => #{
- <<"topic">> => erlang:list_to_binary(KafkaTopic)
+ <<"topic">> => erlang:list_to_binary(KafkaTopic),
+ <<"message">> => #{
+ <<"key">> => <<"${clientid}">>,
+ <<"value">> => <<"${.payload}">>
+ }
}
}
},
@@ -501,6 +505,7 @@ producer = {
}
kafka = {
topic = \"{{ kafka_topic }}\"
+ message = {key = \"${clientid}\", value = \"${.payload}\"}
}
}
""".