From b812f9af5a481f8927fe6f25404eec143a73f06b Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 18 Apr 2023 17:42:39 +0300 Subject: [PATCH] feat(tpl): use `emqx_connector_template` in `emqx_rule_engine` app --- .../src/emqx_connector_template.erl | 1 + .../src/emqx_rule_actions.erl | 103 +++++++++--------- .../test/emqx_rule_engine_SUITE.erl | 5 +- 3 files changed, 57 insertions(+), 52 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_template.erl b/apps/emqx_connector/src/emqx_connector_template.erl index 4f583573c..bb26edec1 100644 --- a/apps/emqx_connector/src/emqx_connector_template.erl +++ b/apps/emqx_connector/src/emqx_connector_template.erl @@ -30,6 +30,7 @@ -export([render_strict/2]). -export([render_strict/3]). +-export([lookup_var/2]). -export([to_string/1]). -export_type([t/0]). diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index 276f8d0e0..bb9966b4a 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -65,23 +65,18 @@ pre_process_action_args( qos := QoS, retain := Retain, payload := Payload, - mqtt_properties := MQTTPropertiesTemplate0, - user_properties := UserPropertiesTemplate + mqtt_properties := MQTTProperties, + user_properties := UserProperties } = Args ) -> - MQTTPropertiesTemplate = - maps:map( - fun(_Key, V) -> emqx_placeholder:preproc_tmpl(V) end, - MQTTPropertiesTemplate0 - ), Args#{ preprocessed_tmpl => #{ - topic => emqx_placeholder:preproc_tmpl(Topic), - qos => preproc_vars(QoS), - retain => preproc_vars(Retain), - payload => emqx_placeholder:preproc_tmpl(Payload), - mqtt_properties => MQTTPropertiesTemplate, - user_properties => preproc_user_properties(UserPropertiesTemplate) + topic => emqx_connector_template:parse(Topic), + qos => parse_vars(QoS), + retain => parse_vars(Retain), + payload => parse_payload(Payload), + mqtt_properties => parse_mqtt_properties(MQTTProperties), + user_properties => parse_user_properties(UserProperties) } }; pre_process_action_args(_, Args) -> @@ -114,25 +109,27 @@ republish( #{metadata := #{rule_id := RuleId}} = Env, #{ preprocessed_tmpl := #{ - qos := QoSTks, - retain := RetainTks, - topic := TopicTks, - payload := PayloadTks, + qos := QoSTemplate, + retain := RetainTemplate, + topic := TopicTemplate, + payload := PayloadTemplate, mqtt_properties := MQTTPropertiesTemplate, - user_properties := UserPropertiesTks + user_properties := UserPropertiesTemplate } } ) -> - Topic = emqx_placeholder:proc_tmpl(TopicTks, Selected), - Payload = format_msg(PayloadTks, Selected), - QoS = replace_simple_var(QoSTks, Selected, 0), - Retain = replace_simple_var(RetainTks, Selected, false), + Topic = unicode:characters_to_binary( + emqx_connector_template:render_strict(TopicTemplate, Selected) + ), + Payload = emqx_connector_template:render_strict(PayloadTemplate, Selected), + QoS = render_simple_var(QoSTemplate, Selected, 0), + Retain = render_simple_var(RetainTemplate, Selected, false), %% 'flags' is set for message re-publishes or message related %% events such as message.acked and message.dropped Flags0 = maps:get(flags, Env, #{}), Flags = Flags0#{retain => Retain}, - PubProps0 = format_pub_props(UserPropertiesTks, Selected, Env), - MQTTProps = format_mqtt_properties(MQTTPropertiesTemplate, Selected, Env), + PubProps0 = render_pub_props(UserPropertiesTemplate, Selected, Env), + MQTTProps = render_mqtt_properties(MQTTPropertiesTemplate, Selected, Env), PubProps = maps:merge(PubProps0, MQTTProps), ?TRACE( "RULE", @@ -203,58 +200,66 @@ safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps) -> _ = emqx_broker:safe_publish(Msg), emqx_metrics:inc_msg(Msg). -preproc_vars(Data) when is_binary(Data) -> - emqx_placeholder:preproc_tmpl(Data); -preproc_vars(Data) -> - Data. +parse_vars(Data) when is_binary(Data) -> + emqx_connector_template:parse(Data); +parse_vars(Data) -> + {const, Data}. -preproc_user_properties(<<"${pub_props.'User-Property'}">>) -> +parse_mqtt_properties(MQTTPropertiesTemplate) -> + maps:map( + fun(_Key, V) -> emqx_connector_template:parse(V) end, + MQTTPropertiesTemplate + ). + +parse_user_properties(<<"${pub_props.'User-Property'}">>) -> %% keep the original %% avoid processing this special variable because %% we do not want to force users to select the value %% the value will be taken from Env.pub_props directly ?ORIGINAL_USER_PROPERTIES; -preproc_user_properties(<<"${", _/binary>> = V) -> +parse_user_properties(<<"${", _/binary>> = V) -> %% use a variable - emqx_placeholder:preproc_tmpl(V); -preproc_user_properties(_) -> + emqx_connector_template:parse(V); +parse_user_properties(_) -> %% invalid, discard undefined. -replace_simple_var(Tokens, Data, Default) when is_list(Tokens) -> - [Var] = emqx_placeholder:proc_tmpl(Tokens, Data, #{return => rawlist}), - case Var of +render_simple_var([{var, Name}], Data, Default) -> + case emqx_connector_template:lookup_var(Name, Data) of + {ok, Var} -> Var; %% cannot find the variable from Data - undefined -> Default; - _ -> Var + {error, _} -> Default end; -replace_simple_var(Val, _Data, _Default) -> +render_simple_var({const, Val}, _Data, _Default) -> Val. -format_msg([], Selected) -> - emqx_utils_json:encode(Selected); -format_msg(Tokens, Selected) -> - emqx_placeholder:proc_tmpl(Tokens, Selected). +parse_payload(Payload) -> + case string:is_empty(Payload) of + false -> emqx_connector_template:parse(Payload); + true -> emqx_connector_template:parse("${.}") + end. -format_pub_props(UserPropertiesTks, Selected, Env) -> +render_pub_props(UserPropertiesTemplate, Selected, Env) -> UserProperties = - case UserPropertiesTks of + case UserPropertiesTemplate of ?ORIGINAL_USER_PROPERTIES -> maps:get('User-Property', maps:get(pub_props, Env, #{}), #{}); undefined -> #{}; _ -> - replace_simple_var(UserPropertiesTks, Selected, #{}) + render_simple_var(UserPropertiesTemplate, Selected, #{}) end, #{'User-Property' => UserProperties}. -format_mqtt_properties(MQTTPropertiesTemplate, Selected, Env) -> +render_mqtt_properties(MQTTPropertiesTemplate, Selected, Env) -> #{metadata := #{rule_id := RuleId}} = Env, - MQTTProperties0 = + MQTTProperties = maps:fold( fun(K, Template, Acc) -> try - V = emqx_placeholder:proc_tmpl(Template, Selected), + V = unicode:characters_to_binary( + emqx_connector_template:render_strict(Template, Selected) + ), Acc#{K => V} catch Kind:Error -> @@ -275,7 +280,7 @@ format_mqtt_properties(MQTTPropertiesTemplate, Selected, Env) -> #{}, MQTTPropertiesTemplate ), - coerce_properties_values(MQTTProperties0, Env). + coerce_properties_values(MQTTProperties, Env). ensure_int(B) when is_binary(B) -> try diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 00ca68264..fcb04f9b3 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -1364,14 +1364,13 @@ t_sqlselect_inject_props(_Config) -> actions => [Repub] } ), - Props = user_properties(#{<<"inject_key">> => <<"inject_val">>}), {ok, Client} = emqtt:start_link([{username, <<"emqx">>}, {proto_ver, v5}]), {ok, _} = emqtt:connect(Client), {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0), emqtt:publish(Client, <<"t1">>, #{}, <<"{\"x\":1}">>, [{qos, 0}]), receive - {publish, #{topic := T, payload := Payload, properties := Props2}} -> - ?assertEqual(Props, Props2), + {publish, #{topic := T, payload := Payload, properties := Props}} -> + ?assertEqual(user_properties(#{<<"inject_key">> => <<"inject_val">>}), Props), ?assertEqual(<<"t2">>, T), ?assertEqual(<<"{\"x\":1}">>, Payload) after 2000 ->